""" Feature engineering for the AA Crew Sequences Bad Weather XGBoost model. Pipeline: 1. Load raw BTS parquet files 2. Build airport-level weather risk features (by airport x month) 3. Construct sequence pairs: inbound (A→DFW) + outbound (DFW→B) same day/pilot window 4. Build pair-level features 5. Label pairs as high-risk (target) 6. Save feature matrix for model training """ import os import numpy as np import pandas as pd RAW_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "raw") PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "processed") os.makedirs(PROCESSED_DIR, exist_ok=True) # Thresholds WEATHER_DELAY_THRESHOLD_MIN = 15 # minutes to count as a weather delay event TURNAROUND_MAX_HRS = 4 # max hours between inbound arrival and outbound departure TURNAROUND_MIN_HRS = 0.5 # min turnaround (shorter = invalid / not a real sequence) # --------------------------------------------------------------------------- # Step 1: Load all raw data # --------------------------------------------------------------------------- def load_raw() -> pd.DataFrame: # Prefer all-carrier files over AA-only; fall back to AA-only if not present all_files = sorted([ os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.startswith("bts_all_dfw_") and f.endswith(".parquet") ]) aa_files = sorted([ os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.startswith("bts_aa_dfw_") and f.endswith(".parquet") ]) # For each year use all-carrier if available, else AA-only year_file = {} for f in aa_files: yr = f.split("_")[-1].replace(".parquet", "") year_file[yr] = f for f in all_files: yr = f.split("_")[-1].replace(".parquet", "") year_file[yr] = f # overrides AA-only files = list(year_file.values()) if not files: raise FileNotFoundError(f"No BTS parquet files found in {RAW_DIR}. Run download_bts.py first.") print(f"Using files: {[os.path.basename(f) for f in sorted(files)]}") # Load year-by-year and aggregate airport features to avoid OOM frames = [] for f in sorted(files): frames.append(pd.read_parquet(f)) df = pd.concat(frames, ignore_index=True) del frames df["FlightDate"] = pd.to_datetime(df["FlightDate"]) df["Month"] = df["FlightDate"].dt.month df["Year"] = df["FlightDate"].dt.year df["Season"] = df["Month"].map({ 12: "winter", 1: "winter", 2: "winter", 3: "spring", 4: "spring", 5: "spring", 6: "summer", 7: "summer", 8: "summer", 9: "fall", 10: "fall", 11: "fall", }) print(f"Loaded {len(df):,} flights across {df['Year'].nunique()} years") return df # --------------------------------------------------------------------------- # Step 2: Airport-level weather risk features # --------------------------------------------------------------------------- def build_airport_features(df: pd.DataFrame) -> pd.DataFrame: """ Compute per-airport, per-month aggregates used as features. We use ALL flights touching that airport (origin or dest) to get robust stats. """ # For each flight, the "airport of interest" for weather is the non-DFW end inbound = df[df["Dest"] == "DFW"].copy() inbound["airport"] = inbound["Origin"] inbound["weather_delayed"] = (inbound["WeatherDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int) inbound["weather_cancel"] = ((inbound["Cancelled"] == 1) & (inbound["CancellationCode"] == "B")).astype(int) inbound["weather_delay_min"] = inbound["WeatherDelay"].fillna(0) inbound["nas_delayed"] = (inbound["NASDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int) outbound = df[df["Origin"] == "DFW"].copy() outbound["airport"] = outbound["Dest"] outbound["weather_delayed"] = (outbound["WeatherDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int) outbound["weather_cancel"] = ((outbound["Cancelled"] == 1) & (outbound["CancellationCode"] == "B")).astype(int) outbound["weather_delay_min"] = outbound["WeatherDelay"].fillna(0) outbound["nas_delayed"] = (outbound["NASDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int) combined = pd.concat([inbound, outbound], ignore_index=True) agg = combined.groupby(["airport", "Month"]).agg( total_flights=("weather_delayed", "count"), weather_delay_rate=("weather_delayed", "mean"), weather_cancel_rate=("weather_cancel", "mean"), avg_weather_delay_min=("weather_delay_min", "mean"), p75_weather_delay_min=("weather_delay_min", lambda x: x.quantile(0.75)), p95_weather_delay_min=("weather_delay_min", lambda x: x.quantile(0.95)), nas_delay_rate=("nas_delayed", "mean"), ).reset_index() # Overall (across all months) per airport overall = combined.groupby("airport").agg( overall_weather_delay_rate=("weather_delayed", "mean"), overall_weather_cancel_rate=("weather_cancel", "mean"), overall_avg_weather_delay_min=("weather_delay_min", "mean"), ).reset_index() agg = agg.merge(overall, on="airport", how="left") print(f"Airport features: {len(agg):,} rows ({agg['airport'].nunique()} airports × months)") return agg # --------------------------------------------------------------------------- # Step 3: Construct sequence pairs # --------------------------------------------------------------------------- def _parse_hhmm(series: pd.Series) -> pd.Series: """Convert HHMM integer (e.g. 1435) to minutes since midnight.""" s = series.fillna(0).astype(int).astype(str).str.zfill(4) hours = s.str[:2].astype(int) minutes = s.str[2:].astype(int) return hours * 60 + minutes def build_sequences(df: pd.DataFrame) -> pd.DataFrame: """ Build (airport_A, airport_B, date) aggregates without materializing flight-level pairs. For each date, collapse inbound flights to unique origins and outbound to unique destinations, then cross-join the airport aggregates (~100 unique origins × ~100 unique destinations = ~10k rows/day) instead of flight-level pairs (~400 × ~400 = 160k rows/day). """ inbound = df[(df["Dest"] == "DFW") & (df["Cancelled"] != 1)].copy() outbound = df[(df["Origin"] == "DFW") & (df["Cancelled"] != 1)].copy() inbound["arr_min"] = _parse_hhmm(inbound["ArrTime"]) outbound["dep_min"] = _parse_hhmm(outbound["DepTime"]) # Aggregate inbound flights to (date, airport_A): worst-case weather outcomes ib_agg = ( inbound.groupby(["FlightDate", "Origin", "Month", "Season", "Year"]) .agg( arr_min_earliest = ("arr_min", "min"), arr_min_latest = ("arr_min", "max"), weather_delay_A = ("WeatherDelay", lambda x: x.fillna(0).max()), arr_delay_A = ("ArrDelay", lambda x: x.fillna(0).max()), nas_delay_A = ("NASDelay", lambda x: x.fillna(0).max()), late_aircraft_A = ("LateAircraftDelay",lambda x: x.fillna(0).max()), ) .reset_index() .rename(columns={"Origin": "airport_A"}) ) # Aggregate outbound flights to (date, airport_B): worst-case weather outcomes ob_agg = ( outbound.groupby(["FlightDate", "Dest"]) .agg( dep_min_earliest = ("dep_min", "min"), dep_min_latest = ("dep_min", "max"), weather_delay_B = ("WeatherDelay", lambda x: x.fillna(0).max()), dep_delay_B = ("DepDelay", lambda x: x.fillna(0).max()), late_aircraft_B = ("LateAircraftDelay",lambda x: x.fillna(0).max()), ) .reset_index() .rename(columns={"Dest": "airport_B"}) ) # Merge on date → cross-join of unique airports per day (~10k rows/day vs 160k) pairs = ib_agg.merge(ob_agg, on="FlightDate", how="inner") pairs = pairs[pairs["airport_A"] != pairs["airport_B"]].copy() # Feasibility filter: at least one valid turnaround window exists # (earliest outbound departs after earliest inbound arrives + min turnaround, # and latest outbound departs before latest inbound arrives + max turnaround) turnaround_mid = pairs["dep_min_earliest"] - pairs["arr_min_latest"] feasible = ( (pairs["dep_min_latest"] >= pairs["arr_min_earliest"] + TURNAROUND_MIN_HRS * 60) & (pairs["dep_min_earliest"] <= pairs["arr_min_latest"] + TURNAROUND_MAX_HRS * 60) ) pairs = pairs[feasible].copy() pairs["turnaround_min"] = turnaround_mid[feasible].clip(lower=0) print(f"Constructed {len(pairs):,} airport-pair×date rows from {pairs['FlightDate'].nunique():,} dates") return pairs # --------------------------------------------------------------------------- # Step 4: Build pair-level feature matrix # --------------------------------------------------------------------------- def build_feature_matrix(pairs: pd.DataFrame, airport_features: pd.DataFrame) -> pd.DataFrame: """ Join airport-level features onto each pair and compute pair-level derived features. """ feat = airport_features.rename(columns=lambda c: f"A_{c}" if c not in ("airport", "Month") else c) feat_b = airport_features.rename(columns=lambda c: f"B_{c}" if c not in ("airport", "Month") else c) # Join airport A features pairs = pairs.merge( feat.rename(columns={"airport": "airport_A"}), on=["airport_A", "Month"], how="left" ) # Join airport B features pairs = pairs.merge( feat_b.rename(columns={"airport": "airport_B"}), on=["airport_B", "Month"], how="left" ) # Pair-level derived features pairs["pair_combined_weather_rate"] = ( pairs["A_weather_delay_rate"] * pairs["B_weather_delay_rate"] ) pairs["pair_max_weather_rate"] = pairs[["A_weather_delay_rate", "B_weather_delay_rate"]].max(axis=1) pairs["pair_min_weather_rate"] = pairs[["A_weather_delay_rate", "B_weather_delay_rate"]].min(axis=1) pairs["pair_weather_rate_sum"] = pairs["A_weather_delay_rate"] + pairs["B_weather_delay_rate"] pairs["pair_avg_weather_delay_min"] = ( pairs["A_avg_weather_delay_min"] + pairs["B_avg_weather_delay_min"] ) / 2 pairs["both_high_risk"] = ( (pairs["A_weather_delay_rate"] > pairs["A_weather_delay_rate"].quantile(0.75)) & (pairs["B_weather_delay_rate"] > pairs["B_weather_delay_rate"].quantile(0.75)) ).astype(int) # Season dummies pairs = pd.get_dummies(pairs, columns=["Season"], prefix="season", drop_first=False) pairs["is_spring_summer"] = pairs.get("season_spring", 0) | pairs.get("season_summer", 0) return pairs # --------------------------------------------------------------------------- # Step 5: Label sequences as high-risk # --------------------------------------------------------------------------- def label_sequences(pairs: pd.DataFrame) -> pd.DataFrame: """ A sequence is labeled 1 (bad) if: - Either leg had a significant weather delay (>= threshold), OR - Leg 1 arrival delay cascaded into leg 2 departure (late aircraft propagation) """ weather_A = pairs["weather_delay_A"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN weather_B = pairs["weather_delay_B"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN cascade = ( (pairs["arr_delay_A"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN) & (pairs["late_aircraft_B"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN) ) pairs["target"] = (weather_A | weather_B | cascade).astype(int) print(f"Label distribution: {pairs['target'].mean():.1%} high-risk sequences") return pairs # --------------------------------------------------------------------------- # Step 6: Final feature selection and save # --------------------------------------------------------------------------- FEATURE_COLS = [ # Airport A features "A_weather_delay_rate", "A_weather_cancel_rate", "A_avg_weather_delay_min", "A_p75_weather_delay_min", "A_p95_weather_delay_min", "A_nas_delay_rate", "A_overall_weather_delay_rate", "A_overall_avg_weather_delay_min", # Airport B features "B_weather_delay_rate", "B_weather_cancel_rate", "B_avg_weather_delay_min", "B_p75_weather_delay_min", "B_p95_weather_delay_min", "B_nas_delay_rate", "B_overall_weather_delay_rate", "B_overall_avg_weather_delay_min", # Pair features "pair_combined_weather_rate", "pair_max_weather_rate", "pair_min_weather_rate", "pair_weather_rate_sum", "pair_avg_weather_delay_min", "both_high_risk", # Temporal "Month", "is_spring_summer", "turnaround_min", # Season dummies (added dynamically) ] def save_features(pairs: pd.DataFrame, year: int = None): """ Aggregate flight-level pairs to (airport_A, airport_B, Month, Year). This is the right unit of analysis: we want to score pair × month combos, not individual sequences. Reduces 21M rows → ~tens of thousands. Features are stable airport-level stats; target is observed bad rate binarized. """ season_cols = [c for c in pairs.columns if c.startswith("season_")] # Stable feature cols: take first value per group (they're the same within group) static_feat_cols = [c for c in FEATURE_COLS + season_cols if c in pairs.columns and c not in ("Month", "turnaround_min")] agg_dict = {c: "first" for c in static_feat_cols} agg_dict["target"] = "mean" # observed bad rate agg_dict["turnaround_min"] = "median" # median turnaround for the pair×month agg_dict["airport_A"] = "first" agg_dict["airport_B"] = "first" grouped = ( pairs .groupby(["airport_A", "airport_B", "Month", "Year"]) .agg( n_sequences=("target", "count"), observed_bad_rate=("target", "mean"), median_turnaround_min=("turnaround_min", "median"), **{c: ("target" if c == "target" else c, "first") for c in static_feat_cols} ) ) # Flatten multi-index columns from named agg grouped = ( pairs .assign(n_seq=1) .groupby(["airport_A", "airport_B", "Month", "Year"]) .agg( n_sequences = ("n_seq", "count"), observed_bad_rate = ("target", "mean"), median_turnaround_min= ("turnaround_min", "median"), **{c: (c, "first") for c in static_feat_cols} ) .reset_index() ) grouped["Month"] = grouped["Month"].astype(int) # Provisional per-year threshold (will be re-set globally in main) threshold = grouped["observed_bad_rate"].median() grouped["target"] = (grouped["observed_bad_rate"] > threshold).astype(int) print(f" {year or ''}: {len(grouped):,} pair×month rows bad_rate threshold={threshold:.3f} high-risk={grouped['target'].mean():.1%}") return grouped # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def _resolve_files() -> list[str]: """Return best available file per year (all-carrier preferred over AA-only).""" all_files = sorted([ os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.startswith("bts_all_dfw_") and f.endswith(".parquet") ]) aa_files = sorted([ os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.startswith("bts_aa_dfw_") and f.endswith(".parquet") ]) year_file = {} for f in aa_files: yr = f.split("_")[-1].replace(".parquet", "") year_file[yr] = f for f in all_files: yr = f.split("_")[-1].replace(".parquet", "") year_file[yr] = f files = sorted(year_file.values()) print(f"Using files: {[os.path.basename(f) for f in files]}") return files def main(): files = _resolve_files() # Build airport features by streaming one year at a time, concat the small aggregates print("\nBuilding airport features (streaming per year)...") ap_agg_frames = [] for fpath in files: df_yr = pd.read_parquet(fpath) df_yr["FlightDate"] = pd.to_datetime(df_yr["FlightDate"]) df_yr["Month"] = df_yr["FlightDate"].dt.month df_yr["Year"] = df_yr["FlightDate"].dt.year ap_agg_frames.append(build_airport_features(df_yr)) del df_yr # Re-aggregate the per-year airport summaries into a single cross-year profile ap_combined = pd.concat(ap_agg_frames, ignore_index=True) del ap_agg_frames airport_features = ( ap_combined.groupby(["airport", "Month"]) .mean(numeric_only=True) .reset_index() ) airport_features.to_parquet(os.path.join(PROCESSED_DIR, "airport_features.parquet"), index=False) print(f"Airport features: {airport_features.shape}") del ap_combined # Sequences: use all-carrier files (broader training signal; airport risk is carrier-agnostic) seq_files = sorted([ os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.startswith("bts_all_dfw_") and f.endswith(".parquet") ]) print(f"\nBuilding sequences from all-carrier files: {[os.path.basename(f) for f in seq_files]}") yearly_aggs = [] files = seq_files for fpath in files: year = int(os.path.basename(fpath).split("_")[-1].replace(".parquet","")) print(f"\n--- Processing {year} ---") df_year = pd.read_parquet(fpath) df_year["FlightDate"] = pd.to_datetime(df_year["FlightDate"]) df_year["Month"] = df_year["FlightDate"].dt.month df_year["Year"] = df_year["FlightDate"].dt.year df_year["Season"] = df_year["Month"].map({ 12:"winter",1:"winter",2:"winter", 3:"spring",4:"spring",5:"spring", 6:"summer",7:"summer",8:"summer", 9:"fall",10:"fall",11:"fall", }) seqs = build_sequences(df_year) seqs = build_feature_matrix(seqs, airport_features) seqs = label_sequences(seqs) # Aggregate immediately per year to avoid accumulating flight-level DFs yearly_aggs.append(save_features(seqs, year=year)) del df_year, seqs # Combine all per-year aggregates and re-label with a consistent threshold all_seqs = pd.concat(yearly_aggs, ignore_index=True) threshold = all_seqs["observed_bad_rate"].median() all_seqs["target"] = (all_seqs["observed_bad_rate"] > threshold).astype(int) out_path = os.path.join(PROCESSED_DIR, "sequence_features.parquet") all_seqs.to_parquet(out_path, index=False) print(f"\nFinal: {len(all_seqs):,} pair×month×year rows saved → {out_path}") print(f"Global threshold: {threshold:.3f} High-risk rate: {all_seqs['target'].mean():.1%}") if __name__ == "__main__": main()