| """ |
| src/features/disruption_index.py |
| ================================== |
| Computes the Disruption Index — a composite 0–100 score representing |
| regional aviation disruption severity at each timestamp. |
| |
| Formula: |
| DI = w1 * cancellation_rate * 40 |
| + w2 * (avg_delay_minutes / MAX_DELAY) * 30 |
| + w3 * (airspace_risk_score / 4) * 20 |
| + w4 * conflict_event_count_norm * 10 |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from src.utils.logger import get_logger |
| from config.settings import ( |
| ROLLING_WINDOW_6H, ROLLING_WINDOW_24H, PROCESSED_DIR |
| ) |
|
|
| logger = get_logger(__name__) |
|
|
| MAX_DELAY = 300.0 |
| MAX_EVENTS = 20.0 |
|
|
|
|
| def compute_disruption_index(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Add disruption_index column to a flight disruption DataFrame. |
| |
| Expected columns (at least): |
| cancellation_rate, avg_delay_minutes, airspace_risk_score, |
| conflict_event_count |
| |
| Returns |
| ------- |
| pd.DataFrame with disruption_index (0–100) added/updated |
| """ |
| df = df.copy() |
|
|
| cancel = df.get("cancellation_rate", pd.Series(0.0, index=df.index)) |
| delay = df.get("avg_delay_minutes", pd.Series(0.0, index=df.index)) |
| risk = df.get("airspace_risk_score", pd.Series(0.0, index=df.index)) |
| events = df.get("conflict_event_count", pd.Series(0.0, index=df.index)) |
|
|
| delay_norm = (pd.to_numeric(delay, errors="coerce").fillna(0) / MAX_DELAY).clip(0, 1) |
| events_norm = (pd.to_numeric(events, errors="coerce").fillna(0) / MAX_EVENTS).clip(0, 1) |
| risk_norm = (pd.to_numeric(risk, errors="coerce").fillna(0) / 4).clip(0, 1) |
| cancel_num = pd.to_numeric(cancel, errors="coerce").fillna(0).clip(0, 1) |
|
|
| df["disruption_index"] = ( |
| cancel_num * 40 + |
| delay_norm * 30 + |
| risk_norm * 20 + |
| events_norm * 10 |
| ).round(2) |
|
|
| logger.debug("Disruption index computed: mean=%.2f, max=%.2f", |
| df["disruption_index"].mean(), df["disruption_index"].max()) |
| return df |
|
|
|
|
| def add_rolling_disruption_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Add rolling aggregations of the disruption index grouped by airport. |
| |
| Adds: disruption_index_6h, disruption_index_24h, disruption_index_lag6h |
| """ |
| df = df.copy() |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| df = df.sort_values(["airport_code", "timestamp"]) |
|
|
| grp = df.groupby("airport_code")["disruption_index"] |
|
|
| df["disruption_index_6h"] = grp.transform( |
| lambda x: x.rolling(ROLLING_WINDOW_6H, min_periods=1).mean()).round(2) |
| df["disruption_index_24h"] = grp.transform( |
| lambda x: x.rolling(ROLLING_WINDOW_24H, min_periods=1).mean()).round(2) |
| df["disruption_index_lag6h"] = grp.transform( |
| lambda x: x.shift(ROLLING_WINDOW_6H)).round(2) |
|
|
| df["disruption_index_lag6h"] = df["disruption_index_lag6h"].fillna( |
| df["disruption_index"]) |
|
|
| return df |
|
|
|
|
| def label_high_disruption(df: pd.DataFrame, threshold: float = 50.0) -> pd.DataFrame: |
| """Add binary is_high_disruption label.""" |
| df = df.copy() |
| df["is_high_disruption"] = (df["disruption_index"] > threshold).astype(int) |
| return df |
|
|
|
|
| def run(df: pd.DataFrame = None) -> pd.DataFrame: |
| """ |
| Full disruption index pipeline. Loads data if df is not provided. |
| """ |
| if df is None: |
| path = PROCESSED_DIR / "flight_disruptions.csv" |
| df = pd.read_csv(path, low_memory=False) |
| logger.info("Loaded %d rows from %s", len(df), path.name) |
|
|
| df = compute_disruption_index(df) |
| df = add_rolling_disruption_features(df) |
| df = label_high_disruption(df) |
|
|
| return df |
|
|
|
|
| if __name__ == "__main__": |
| df = run() |
| print(df[["airport_code", "timestamp", "disruption_index", |
| "disruption_index_24h", "is_high_disruption"]].head(20)) |
|
|