Spaces:
Sleeping
Sleeping
| """ | |
| XGBoost model to classify high-risk crew sequences (A→DFW→B pairs). | |
| Outputs: | |
| - Trained model saved to data/processed/xgb_model.json | |
| - Per-airport-pair risk scores saved to data/processed/pair_risk_scores.parquet | |
| - Feature importance plot | |
| """ | |
| import os | |
| import json | |
| import numpy as np | |
| import pandas as pd | |
| import xgboost as xgb | |
| from sklearn.model_selection import StratifiedGroupKFold, cross_val_score | |
| from sklearn.metrics import ( | |
| classification_report, roc_auc_score, average_precision_score, | |
| confusion_matrix, | |
| ) | |
| from sklearn.preprocessing import LabelEncoder | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "processed") | |
| RAW_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "raw") | |
| # Absolute observed_bad_rate threshold: pairs where >25% of sequences had weather | |
| # disruption are labeled high-risk. More meaningful than median (which yields 50/50 split). | |
| RISK_THRESHOLD = 0.25 | |
| FEATURE_COLS = [ | |
| # Airport A BTS weather stats | |
| "A_weather_delay_rate", "A_weather_cancel_rate", "A_avg_weather_delay_min", | |
| "A_p75_weather_delay_min", "A_p95_weather_delay_min", "A_nas_delay_rate", | |
| "A_overall_weather_delay_rate", "A_overall_avg_weather_delay_min", | |
| # Airport B BTS weather stats | |
| "B_weather_delay_rate", "B_weather_cancel_rate", "B_avg_weather_delay_min", | |
| "B_p75_weather_delay_min", "B_p95_weather_delay_min", "B_nas_delay_rate", | |
| "B_overall_weather_delay_rate", "B_overall_avg_weather_delay_min", | |
| # Pair-level BTS features | |
| "pair_combined_weather_rate", "pair_max_weather_rate", "pair_min_weather_rate", | |
| "pair_weather_rate_sum", "pair_avg_weather_delay_min", "both_high_risk", | |
| # Temporal | |
| "Month", "is_spring_summer", "median_turnaround_min", | |
| # GSOM weather features (wind + precip; ~55% airports have station coverage) | |
| # XGBoost handles NaN natively — missing airports simply skip these splits | |
| "A_avg_wind_speed", "A_precip_days", "A_extreme_precip", | |
| "A_total_precip", "A_max_wind_gust", | |
| "B_avg_wind_speed", "B_precip_days", "B_extreme_precip", | |
| "B_total_precip", "B_max_wind_gust", | |
| "pair_max_avg_wind_speed", "pair_max_precip_days", | |
| "pair_max_extreme_precip", "pair_max_total_precip", "pair_max_max_wind_gust", | |
| # DFW hub weather (all sequences pass through DFW) | |
| "DFW_weather_delay_rate", "DFW_weather_cancel_rate", | |
| "DFW_avg_weather_delay_min", "DFW_p95_weather_delay_min", | |
| # Tail-chain: crew duty context from aircraft rotation (proxy for same crew) | |
| "tc_legs_before_mean", "tc_block_before_mean", "tc_duty_start_hour", | |
| "tc_total_duty_mean", "tc_total_duty_p75", | |
| "tc_fdp_util_mean", "tc_fdp_util_p75", "tc_fdp_overrun_rate", | |
| "tc_wocl_rate", "tc_legs_after_mean", "tc_legs_in_day_mean", | |
| "tc_downstream_rate", "tc_cascade_late_rate", | |
| "tc_cascade_late_min", "tc_cascade_amplif_mean", | |
| # Airport-level cascade propagation index | |
| "A_ap_cascade_rate", "A_ap_cascade_given_late", | |
| "B_ap_cascade_rate", "B_ap_cascade_given_late", | |
| "pair_cascade_product", "pair_max_cascade_rate", | |
| # Multi-hop DFW cascade: A→DFW→B→DFW→C→DFW→D | |
| "mhc_n_hops_mean", "mhc_n_hops_max", | |
| "mhc_total_late_min_mean", "mhc_total_late_min_p75", | |
| "mhc_cascade_hop_rate", "mhc_cascade_depth_mean", | |
| "mhc_unique_airports_mean", "mhc_recovery_rate", | |
| ] | |
| def get_dfw_weather() -> pd.DataFrame: | |
| """ | |
| Compute DFW monthly weather stats from raw BTS files. | |
| DFW is the hub for every sequence, so its weather is a shared risk factor | |
| not captured in airport_features (which only covers non-DFW airports). | |
| Results cached to data/processed/dfw_weather_monthly.parquet. | |
| """ | |
| cache = os.path.join(PROCESSED_DIR, "dfw_weather_monthly.parquet") | |
| if os.path.exists(cache): | |
| return pd.read_parquet(cache) | |
| import glob | |
| files = sorted(glob.glob(os.path.join(RAW_DIR, "bts_all_dfw_*.parquet"))) | |
| if not files: | |
| files = sorted(glob.glob(os.path.join(RAW_DIR, "bts_aa_dfw_*.parquet"))) | |
| print(f"Computing DFW weather from {len(files)} raw files...") | |
| frames = [] | |
| for f in files: | |
| df = pd.read_parquet(f) | |
| # All flights in dataset touch DFW (Origin or Dest) | |
| df["weather_delayed"] = (df["WeatherDelay"].fillna(0) >= 15).astype(int) | |
| df["weather_cancel"] = ( | |
| (df["Cancelled"] == 1) & (df["CancellationCode"] == "B") | |
| ).astype(int) | |
| df["weather_delay_min"] = df["WeatherDelay"].fillna(0) | |
| frames.append(df[["Month", "weather_delayed", "weather_cancel", "weather_delay_min"]]) | |
| combined = pd.concat(frames, ignore_index=True) | |
| dfw = ( | |
| combined.groupby("Month") | |
| .agg( | |
| DFW_weather_delay_rate=("weather_delayed", "mean"), | |
| DFW_weather_cancel_rate=("weather_cancel", "mean"), | |
| DFW_avg_weather_delay_min=("weather_delay_min", "mean"), | |
| DFW_p95_weather_delay_min=("weather_delay_min", lambda x: x.quantile(0.95)), | |
| ) | |
| .reset_index() | |
| ) | |
| dfw.to_parquet(cache, index=False) | |
| print(f"DFW weather cached → {cache}") | |
| return dfw | |
| def load_features(): | |
| path = os.path.join(PROCESSED_DIR, "sequence_features.parquet") | |
| df = pd.read_parquet(path) | |
| # Fix label: use absolute threshold (>25% bad rate = high risk) instead of | |
| # the pre-baked median-based target which forces a meaningless 50/50 split. | |
| df["target"] = (df["observed_bad_rate"] > RISK_THRESHOLD).astype(int) | |
| print(f"Target recomputed: {df['target'].mean():.1%} positive " | |
| f"(observed_bad_rate > {RISK_THRESHOLD})") | |
| # Join DFW hub weather by month | |
| dfw = get_dfw_weather() | |
| df = df.merge(dfw, on="Month", how="left") | |
| # Join tail-chain features (crew duty proxy via aircraft rotation) | |
| tc_path = os.path.join(PROCESSED_DIR, "tail_chain_features.parquet") | |
| if os.path.exists(tc_path): | |
| tc = pd.read_parquet(tc_path) | |
| tc_cols = [c for c in tc.columns if c not in ("airport_A", "airport_B", "Month", "Year")] | |
| tc = tc[["airport_A", "airport_B", "Month", "Year"] + tc_cols] | |
| df = df.merge(tc, on=["airport_A", "airport_B", "Month", "Year"], how="left") | |
| print(f"Tail-chain joined: {len(tc_cols)} features, {tc['airport_A'].nunique()} airports covered") | |
| else: | |
| print("tail_chain_features.parquet not found — skipping tail-chain features") | |
| # Join airport-level cascade propagation (for both A and B) | |
| ap_path = os.path.join(PROCESSED_DIR, "airport_cascade_features.parquet") | |
| if os.path.exists(ap_path): | |
| ap = pd.read_parquet(ap_path) | |
| ap_feat_cols = [c for c in ap.columns if c not in ("airport", "Month")] | |
| for side in ("A", "B"): | |
| rename = {c: f"{side}_ap_{c}" for c in ap_feat_cols} | |
| merged = ap.rename(columns={"airport": f"airport_{side}", **rename}) | |
| on_cols = [f"airport_{side}", "Month"] | |
| df = df.merge(merged[[f"airport_{side}", "Month"] + list(rename.values())], | |
| on=on_cols, how="left") | |
| # Pair interaction features | |
| if "A_ap_cascade_rate" in df.columns and "B_ap_cascade_rate" in df.columns: | |
| df["pair_cascade_product"] = df["A_ap_cascade_rate"] * df["B_ap_cascade_rate"] | |
| df["pair_max_cascade_rate"] = df[["A_ap_cascade_rate", "B_ap_cascade_rate"]].max(axis=1) | |
| print(f"Airport cascade joined: {len(ap_feat_cols)} features per airport") | |
| else: | |
| print("airport_cascade_features.parquet not found — skipping airport cascade features") | |
| # Join multi-hop DFW cascade features | |
| mhc_path = os.path.join(PROCESSED_DIR, "multihop_cascade_features.parquet") | |
| if os.path.exists(mhc_path): | |
| mhc = pd.read_parquet(mhc_path) | |
| mhc_cols = [c for c in mhc.columns if c not in ("airport_A", "airport_B", "Month", "Year")] | |
| df = df.merge(mhc[["airport_A", "airport_B", "Month", "Year"] + mhc_cols], | |
| on=["airport_A", "airport_B", "Month", "Year"], how="left") | |
| print(f"Multi-hop cascade joined: {len(mhc_cols)} features") | |
| else: | |
| print("multihop_cascade_features.parquet not found — skipping multi-hop cascade features") | |
| season_cols = [c for c in df.columns if c.startswith("season_")] | |
| return df, season_cols | |
| def train(df: pd.DataFrame, feature_cols: list[str]) -> xgb.XGBClassifier: | |
| X = df[feature_cols].astype(float) | |
| y = df["target"].astype(int) | |
| # Class imbalance: weight the minority class | |
| neg, pos = (y == 0).sum(), (y == 1).sum() | |
| scale_pos_weight = neg / pos | |
| print(f"Class balance — negative: {neg:,}, positive: {pos:,}, scale_pos_weight: {scale_pos_weight:.2f}") | |
| model = xgb.XGBClassifier( | |
| n_estimators=500, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| scale_pos_weight=scale_pos_weight, | |
| eval_metric="aucpr", # average precision — better for imbalanced | |
| early_stopping_rounds=30, | |
| random_state=42, | |
| n_jobs=-1, | |
| device="cuda", | |
| tree_method="hist", | |
| ) | |
| # Time-based split: train on earlier years, validate on most recent year | |
| train_mask = df["Year"] < df["Year"].max() | |
| val_mask = df["Year"] == df["Year"].max() | |
| X_train, y_train = X[train_mask], y[train_mask] | |
| X_val, y_val = X[val_mask], y[val_mask] | |
| print(f"Train: {len(X_train):,} rows | Val: {len(X_val):,} rows") | |
| model.fit( | |
| X_train, y_train, | |
| eval_set=[(X_val, y_val)], | |
| verbose=50, | |
| ) | |
| # Evaluation | |
| y_pred_proba = model.predict_proba(X_val)[:, 1] | |
| y_pred = model.predict(X_val) | |
| print("\n--- Validation Results ---") | |
| print(f"ROC-AUC: {roc_auc_score(y_val, y_pred_proba):.4f}") | |
| print(f"Average Precision: {average_precision_score(y_val, y_pred_proba):.4f}") | |
| print("\nClassification Report:") | |
| print(classification_report(y_val, y_pred, target_names=["low_risk", "high_risk"])) | |
| return model | |
| def plot_feature_importance(model: xgb.XGBClassifier, feature_cols: list[str]): | |
| importances = model.feature_importances_ | |
| feat_df = pd.DataFrame({"feature": feature_cols, "importance": importances}) | |
| feat_df = feat_df.sort_values("importance", ascending=True).tail(20) | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| ax.barh(feat_df["feature"], feat_df["importance"], color="steelblue") | |
| ax.set_xlabel("Feature Importance (gain)") | |
| ax.set_title("Top 20 XGBoost Feature Importances\nCrew Sequence Weather Risk") | |
| plt.tight_layout() | |
| out = os.path.join(PROCESSED_DIR, "feature_importance.png") | |
| plt.savefig(out, dpi=150) | |
| print(f"Feature importance plot saved → {out}") | |
| def score_all_pairs(model: xgb.XGBClassifier, df: pd.DataFrame, feature_cols: list[str]): | |
| """ | |
| Aggregate sequence-level predictions into per-pair (airport_A, airport_B, Month) | |
| risk scores. This is the final deliverable: which pairs are risky, in which months. | |
| """ | |
| X = df[feature_cols].astype(float) | |
| df = df.copy() | |
| df["risk_score"] = model.predict_proba(X)[:, 1] | |
| pair_scores = ( | |
| df.groupby(["airport_A", "airport_B", "Month"]) | |
| .agg( | |
| avg_risk_score=("risk_score", "mean"), | |
| max_risk_score=("risk_score", "max"), | |
| n_sequences=("n_sequences", "sum") if "n_sequences" in df.columns else ("risk_score", "count"), | |
| observed_bad_rate=("observed_bad_rate", "mean") if "observed_bad_rate" in df.columns else ("target", "mean"), | |
| ) | |
| .reset_index() | |
| .sort_values("avg_risk_score", ascending=False) | |
| ) | |
| out = os.path.join(PROCESSED_DIR, "pair_risk_scores.parquet") | |
| pair_scores.to_parquet(out, index=False) | |
| print(f"\nPair risk scores saved → {out}") | |
| print("\nTop 20 riskiest airport pairs:") | |
| print(pair_scores.head(20).to_string(index=False)) | |
| return pair_scores | |
| def main(): | |
| df, season_cols = load_features() | |
| feature_cols = FEATURE_COLS + season_cols | |
| feature_cols = [c for c in feature_cols if c in df.columns] | |
| model = train(df, feature_cols) | |
| # Save model | |
| model_path = os.path.join(PROCESSED_DIR, "xgb_model.json") | |
| model.save_model(model_path) | |
| print(f"\nModel saved → {model_path}") | |
| plot_feature_importance(model, feature_cols) | |
| score_all_pairs(model, df, feature_cols) | |
| if __name__ == "__main__": | |
| main() | |