AA-EPPS-Data-Challenge / src /feature_engineering.py
itaykadosh's picture
Initial upload: AA EPPS Data Challenge app
bef09da verified
"""
Feature engineering for the AA Crew Sequences Bad Weather XGBoost model.
Pipeline:
1. Load raw BTS parquet files
2. Build airport-level weather risk features (by airport x month)
3. Construct sequence pairs: inbound (A→DFW) + outbound (DFW→B) same day/pilot window
4. Build pair-level features
5. Label pairs as high-risk (target)
6. Save feature matrix for model training
"""
import os
import numpy as np
import pandas as pd
RAW_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "raw")
PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "processed")
os.makedirs(PROCESSED_DIR, exist_ok=True)
# Thresholds
WEATHER_DELAY_THRESHOLD_MIN = 15 # minutes to count as a weather delay event
TURNAROUND_MAX_HRS = 4 # max hours between inbound arrival and outbound departure
TURNAROUND_MIN_HRS = 0.5 # min turnaround (shorter = invalid / not a real sequence)
# ---------------------------------------------------------------------------
# Step 1: Load all raw data
# ---------------------------------------------------------------------------
def load_raw() -> pd.DataFrame:
# Prefer all-carrier files over AA-only; fall back to AA-only if not present
all_files = sorted([
os.path.join(RAW_DIR, f)
for f in os.listdir(RAW_DIR)
if f.startswith("bts_all_dfw_") and f.endswith(".parquet")
])
aa_files = sorted([
os.path.join(RAW_DIR, f)
for f in os.listdir(RAW_DIR)
if f.startswith("bts_aa_dfw_") and f.endswith(".parquet")
])
# For each year use all-carrier if available, else AA-only
year_file = {}
for f in aa_files:
yr = f.split("_")[-1].replace(".parquet", "")
year_file[yr] = f
for f in all_files:
yr = f.split("_")[-1].replace(".parquet", "")
year_file[yr] = f # overrides AA-only
files = list(year_file.values())
if not files:
raise FileNotFoundError(f"No BTS parquet files found in {RAW_DIR}. Run download_bts.py first.")
print(f"Using files: {[os.path.basename(f) for f in sorted(files)]}")
# Load year-by-year and aggregate airport features to avoid OOM
frames = []
for f in sorted(files):
frames.append(pd.read_parquet(f))
df = pd.concat(frames, ignore_index=True)
del frames
df["FlightDate"] = pd.to_datetime(df["FlightDate"])
df["Month"] = df["FlightDate"].dt.month
df["Year"] = df["FlightDate"].dt.year
df["Season"] = df["Month"].map({
12: "winter", 1: "winter", 2: "winter",
3: "spring", 4: "spring", 5: "spring",
6: "summer", 7: "summer", 8: "summer",
9: "fall", 10: "fall", 11: "fall",
})
print(f"Loaded {len(df):,} flights across {df['Year'].nunique()} years")
return df
# ---------------------------------------------------------------------------
# Step 2: Airport-level weather risk features
# ---------------------------------------------------------------------------
def build_airport_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute per-airport, per-month aggregates used as features.
We use ALL flights touching that airport (origin or dest) to get robust stats.
"""
# For each flight, the "airport of interest" for weather is the non-DFW end
inbound = df[df["Dest"] == "DFW"].copy()
inbound["airport"] = inbound["Origin"]
inbound["weather_delayed"] = (inbound["WeatherDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int)
inbound["weather_cancel"] = ((inbound["Cancelled"] == 1) & (inbound["CancellationCode"] == "B")).astype(int)
inbound["weather_delay_min"] = inbound["WeatherDelay"].fillna(0)
inbound["nas_delayed"] = (inbound["NASDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int)
outbound = df[df["Origin"] == "DFW"].copy()
outbound["airport"] = outbound["Dest"]
outbound["weather_delayed"] = (outbound["WeatherDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int)
outbound["weather_cancel"] = ((outbound["Cancelled"] == 1) & (outbound["CancellationCode"] == "B")).astype(int)
outbound["weather_delay_min"] = outbound["WeatherDelay"].fillna(0)
outbound["nas_delayed"] = (outbound["NASDelay"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN).astype(int)
combined = pd.concat([inbound, outbound], ignore_index=True)
agg = combined.groupby(["airport", "Month"]).agg(
total_flights=("weather_delayed", "count"),
weather_delay_rate=("weather_delayed", "mean"),
weather_cancel_rate=("weather_cancel", "mean"),
avg_weather_delay_min=("weather_delay_min", "mean"),
p75_weather_delay_min=("weather_delay_min", lambda x: x.quantile(0.75)),
p95_weather_delay_min=("weather_delay_min", lambda x: x.quantile(0.95)),
nas_delay_rate=("nas_delayed", "mean"),
).reset_index()
# Overall (across all months) per airport
overall = combined.groupby("airport").agg(
overall_weather_delay_rate=("weather_delayed", "mean"),
overall_weather_cancel_rate=("weather_cancel", "mean"),
overall_avg_weather_delay_min=("weather_delay_min", "mean"),
).reset_index()
agg = agg.merge(overall, on="airport", how="left")
print(f"Airport features: {len(agg):,} rows ({agg['airport'].nunique()} airports × months)")
return agg
# ---------------------------------------------------------------------------
# Step 3: Construct sequence pairs
# ---------------------------------------------------------------------------
def _parse_hhmm(series: pd.Series) -> pd.Series:
"""Convert HHMM integer (e.g. 1435) to minutes since midnight."""
s = series.fillna(0).astype(int).astype(str).str.zfill(4)
hours = s.str[:2].astype(int)
minutes = s.str[2:].astype(int)
return hours * 60 + minutes
def build_sequences(df: pd.DataFrame) -> pd.DataFrame:
"""
Build (airport_A, airport_B, date) aggregates without materializing
flight-level pairs. For each date, collapse inbound flights to unique
origins and outbound to unique destinations, then cross-join the airport
aggregates (~100 unique origins × ~100 unique destinations = ~10k rows/day)
instead of flight-level pairs (~400 × ~400 = 160k rows/day).
"""
inbound = df[(df["Dest"] == "DFW") & (df["Cancelled"] != 1)].copy()
outbound = df[(df["Origin"] == "DFW") & (df["Cancelled"] != 1)].copy()
inbound["arr_min"] = _parse_hhmm(inbound["ArrTime"])
outbound["dep_min"] = _parse_hhmm(outbound["DepTime"])
# Aggregate inbound flights to (date, airport_A): worst-case weather outcomes
ib_agg = (
inbound.groupby(["FlightDate", "Origin", "Month", "Season", "Year"])
.agg(
arr_min_earliest = ("arr_min", "min"),
arr_min_latest = ("arr_min", "max"),
weather_delay_A = ("WeatherDelay", lambda x: x.fillna(0).max()),
arr_delay_A = ("ArrDelay", lambda x: x.fillna(0).max()),
nas_delay_A = ("NASDelay", lambda x: x.fillna(0).max()),
late_aircraft_A = ("LateAircraftDelay",lambda x: x.fillna(0).max()),
)
.reset_index()
.rename(columns={"Origin": "airport_A"})
)
# Aggregate outbound flights to (date, airport_B): worst-case weather outcomes
ob_agg = (
outbound.groupby(["FlightDate", "Dest"])
.agg(
dep_min_earliest = ("dep_min", "min"),
dep_min_latest = ("dep_min", "max"),
weather_delay_B = ("WeatherDelay", lambda x: x.fillna(0).max()),
dep_delay_B = ("DepDelay", lambda x: x.fillna(0).max()),
late_aircraft_B = ("LateAircraftDelay",lambda x: x.fillna(0).max()),
)
.reset_index()
.rename(columns={"Dest": "airport_B"})
)
# Merge on date → cross-join of unique airports per day (~10k rows/day vs 160k)
pairs = ib_agg.merge(ob_agg, on="FlightDate", how="inner")
pairs = pairs[pairs["airport_A"] != pairs["airport_B"]].copy()
# Feasibility filter: at least one valid turnaround window exists
# (earliest outbound departs after earliest inbound arrives + min turnaround,
# and latest outbound departs before latest inbound arrives + max turnaround)
turnaround_mid = pairs["dep_min_earliest"] - pairs["arr_min_latest"]
feasible = (
(pairs["dep_min_latest"] >= pairs["arr_min_earliest"] + TURNAROUND_MIN_HRS * 60) &
(pairs["dep_min_earliest"] <= pairs["arr_min_latest"] + TURNAROUND_MAX_HRS * 60)
)
pairs = pairs[feasible].copy()
pairs["turnaround_min"] = turnaround_mid[feasible].clip(lower=0)
print(f"Constructed {len(pairs):,} airport-pair×date rows from {pairs['FlightDate'].nunique():,} dates")
return pairs
# ---------------------------------------------------------------------------
# Step 4: Build pair-level feature matrix
# ---------------------------------------------------------------------------
def build_feature_matrix(pairs: pd.DataFrame, airport_features: pd.DataFrame) -> pd.DataFrame:
"""
Join airport-level features onto each pair and compute pair-level derived features.
"""
feat = airport_features.rename(columns=lambda c: f"A_{c}" if c not in ("airport", "Month") else c)
feat_b = airport_features.rename(columns=lambda c: f"B_{c}" if c not in ("airport", "Month") else c)
# Join airport A features
pairs = pairs.merge(
feat.rename(columns={"airport": "airport_A"}),
on=["airport_A", "Month"], how="left"
)
# Join airport B features
pairs = pairs.merge(
feat_b.rename(columns={"airport": "airport_B"}),
on=["airport_B", "Month"], how="left"
)
# Pair-level derived features
pairs["pair_combined_weather_rate"] = (
pairs["A_weather_delay_rate"] * pairs["B_weather_delay_rate"]
)
pairs["pair_max_weather_rate"] = pairs[["A_weather_delay_rate", "B_weather_delay_rate"]].max(axis=1)
pairs["pair_min_weather_rate"] = pairs[["A_weather_delay_rate", "B_weather_delay_rate"]].min(axis=1)
pairs["pair_weather_rate_sum"] = pairs["A_weather_delay_rate"] + pairs["B_weather_delay_rate"]
pairs["pair_avg_weather_delay_min"] = (
pairs["A_avg_weather_delay_min"] + pairs["B_avg_weather_delay_min"]
) / 2
pairs["both_high_risk"] = (
(pairs["A_weather_delay_rate"] > pairs["A_weather_delay_rate"].quantile(0.75)) &
(pairs["B_weather_delay_rate"] > pairs["B_weather_delay_rate"].quantile(0.75))
).astype(int)
# Season dummies
pairs = pd.get_dummies(pairs, columns=["Season"], prefix="season", drop_first=False)
pairs["is_spring_summer"] = pairs.get("season_spring", 0) | pairs.get("season_summer", 0)
return pairs
# ---------------------------------------------------------------------------
# Step 5: Label sequences as high-risk
# ---------------------------------------------------------------------------
def label_sequences(pairs: pd.DataFrame) -> pd.DataFrame:
"""
A sequence is labeled 1 (bad) if:
- Either leg had a significant weather delay (>= threshold), OR
- Leg 1 arrival delay cascaded into leg 2 departure (late aircraft propagation)
"""
weather_A = pairs["weather_delay_A"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN
weather_B = pairs["weather_delay_B"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN
cascade = (
(pairs["arr_delay_A"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN) &
(pairs["late_aircraft_B"].fillna(0) >= WEATHER_DELAY_THRESHOLD_MIN)
)
pairs["target"] = (weather_A | weather_B | cascade).astype(int)
print(f"Label distribution: {pairs['target'].mean():.1%} high-risk sequences")
return pairs
# ---------------------------------------------------------------------------
# Step 6: Final feature selection and save
# ---------------------------------------------------------------------------
FEATURE_COLS = [
# Airport A features
"A_weather_delay_rate", "A_weather_cancel_rate", "A_avg_weather_delay_min",
"A_p75_weather_delay_min", "A_p95_weather_delay_min", "A_nas_delay_rate",
"A_overall_weather_delay_rate", "A_overall_avg_weather_delay_min",
# Airport B features
"B_weather_delay_rate", "B_weather_cancel_rate", "B_avg_weather_delay_min",
"B_p75_weather_delay_min", "B_p95_weather_delay_min", "B_nas_delay_rate",
"B_overall_weather_delay_rate", "B_overall_avg_weather_delay_min",
# Pair features
"pair_combined_weather_rate", "pair_max_weather_rate", "pair_min_weather_rate",
"pair_weather_rate_sum", "pair_avg_weather_delay_min", "both_high_risk",
# Temporal
"Month", "is_spring_summer", "turnaround_min",
# Season dummies (added dynamically)
]
def save_features(pairs: pd.DataFrame, year: int = None):
"""
Aggregate flight-level pairs to (airport_A, airport_B, Month, Year).
This is the right unit of analysis: we want to score pair × month combos,
not individual sequences. Reduces 21M rows → ~tens of thousands.
Features are stable airport-level stats; target is observed bad rate binarized.
"""
season_cols = [c for c in pairs.columns if c.startswith("season_")]
# Stable feature cols: take first value per group (they're the same within group)
static_feat_cols = [c for c in FEATURE_COLS + season_cols if c in pairs.columns
and c not in ("Month", "turnaround_min")]
agg_dict = {c: "first" for c in static_feat_cols}
agg_dict["target"] = "mean" # observed bad rate
agg_dict["turnaround_min"] = "median" # median turnaround for the pair×month
agg_dict["airport_A"] = "first"
agg_dict["airport_B"] = "first"
grouped = (
pairs
.groupby(["airport_A", "airport_B", "Month", "Year"])
.agg(
n_sequences=("target", "count"),
observed_bad_rate=("target", "mean"),
median_turnaround_min=("turnaround_min", "median"),
**{c: ("target" if c == "target" else c, "first")
for c in static_feat_cols}
)
)
# Flatten multi-index columns from named agg
grouped = (
pairs
.assign(n_seq=1)
.groupby(["airport_A", "airport_B", "Month", "Year"])
.agg(
n_sequences = ("n_seq", "count"),
observed_bad_rate = ("target", "mean"),
median_turnaround_min= ("turnaround_min", "median"),
**{c: (c, "first") for c in static_feat_cols}
)
.reset_index()
)
grouped["Month"] = grouped["Month"].astype(int)
# Provisional per-year threshold (will be re-set globally in main)
threshold = grouped["observed_bad_rate"].median()
grouped["target"] = (grouped["observed_bad_rate"] > threshold).astype(int)
print(f" {year or ''}: {len(grouped):,} pair×month rows bad_rate threshold={threshold:.3f} high-risk={grouped['target'].mean():.1%}")
return grouped
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _resolve_files() -> list[str]:
"""Return best available file per year (all-carrier preferred over AA-only)."""
all_files = sorted([
os.path.join(RAW_DIR, f)
for f in os.listdir(RAW_DIR)
if f.startswith("bts_all_dfw_") and f.endswith(".parquet")
])
aa_files = sorted([
os.path.join(RAW_DIR, f)
for f in os.listdir(RAW_DIR)
if f.startswith("bts_aa_dfw_") and f.endswith(".parquet")
])
year_file = {}
for f in aa_files:
yr = f.split("_")[-1].replace(".parquet", "")
year_file[yr] = f
for f in all_files:
yr = f.split("_")[-1].replace(".parquet", "")
year_file[yr] = f
files = sorted(year_file.values())
print(f"Using files: {[os.path.basename(f) for f in files]}")
return files
def main():
files = _resolve_files()
# Build airport features by streaming one year at a time, concat the small aggregates
print("\nBuilding airport features (streaming per year)...")
ap_agg_frames = []
for fpath in files:
df_yr = pd.read_parquet(fpath)
df_yr["FlightDate"] = pd.to_datetime(df_yr["FlightDate"])
df_yr["Month"] = df_yr["FlightDate"].dt.month
df_yr["Year"] = df_yr["FlightDate"].dt.year
ap_agg_frames.append(build_airport_features(df_yr))
del df_yr
# Re-aggregate the per-year airport summaries into a single cross-year profile
ap_combined = pd.concat(ap_agg_frames, ignore_index=True)
del ap_agg_frames
airport_features = (
ap_combined.groupby(["airport", "Month"])
.mean(numeric_only=True)
.reset_index()
)
airport_features.to_parquet(os.path.join(PROCESSED_DIR, "airport_features.parquet"), index=False)
print(f"Airport features: {airport_features.shape}")
del ap_combined
# Sequences: use all-carrier files (broader training signal; airport risk is carrier-agnostic)
seq_files = sorted([
os.path.join(RAW_DIR, f)
for f in os.listdir(RAW_DIR)
if f.startswith("bts_all_dfw_") and f.endswith(".parquet")
])
print(f"\nBuilding sequences from all-carrier files: {[os.path.basename(f) for f in seq_files]}")
yearly_aggs = []
files = seq_files
for fpath in files:
year = int(os.path.basename(fpath).split("_")[-1].replace(".parquet",""))
print(f"\n--- Processing {year} ---")
df_year = pd.read_parquet(fpath)
df_year["FlightDate"] = pd.to_datetime(df_year["FlightDate"])
df_year["Month"] = df_year["FlightDate"].dt.month
df_year["Year"] = df_year["FlightDate"].dt.year
df_year["Season"] = df_year["Month"].map({
12:"winter",1:"winter",2:"winter",
3:"spring",4:"spring",5:"spring",
6:"summer",7:"summer",8:"summer",
9:"fall",10:"fall",11:"fall",
})
seqs = build_sequences(df_year)
seqs = build_feature_matrix(seqs, airport_features)
seqs = label_sequences(seqs)
# Aggregate immediately per year to avoid accumulating flight-level DFs
yearly_aggs.append(save_features(seqs, year=year))
del df_year, seqs
# Combine all per-year aggregates and re-label with a consistent threshold
all_seqs = pd.concat(yearly_aggs, ignore_index=True)
threshold = all_seqs["observed_bad_rate"].median()
all_seqs["target"] = (all_seqs["observed_bad_rate"] > threshold).astype(int)
out_path = os.path.join(PROCESSED_DIR, "sequence_features.parquet")
all_seqs.to_parquet(out_path, index=False)
print(f"\nFinal: {len(all_seqs):,} pair×month×year rows saved → {out_path}")
print(f"Global threshold: {threshold:.3f} High-risk rate: {all_seqs['target'].mean():.1%}")
if __name__ == "__main__":
main()