| """ |
| src/processing/base_loader.py |
| ============================== |
| SOURCE-OF-TRUTH READER for all data/base/ CSVs. |
| |
| This module is the single entry-point for feature engineering. |
| It reads the real Kaggle historical CSVs + growing live-appended base files |
| and produces ML-ready DataFrames for classification and regression. |
| |
| Architecture rule: |
| - data/base/ → read here, appended by ingestion modules |
| - data/derived/ → written here (regenerated each run) |
| - data/raw/ → never read for features; audit trail only |
| |
| Base CSV → Feature mapping: |
| flight_cancellations.csv → cancellation_rate, is_cancelled per airport/date |
| airport_disruptions.csv → disruption_index, severity, flights_affected |
| airspace_closures.csv → airspace_risk_score per country/date |
| conflict_events.csv → conflict_event_count, conflict_intensity per region/date |
| flight_reroutes.csv → avg_delay_hours, extra_fuel_cost per route/date |
| airline_losses.csv → airline_exposure_score per airline |
| oil_prices.csv → oil_price, oil_price_change_pct (growing) |
| sentiment.csv → sentiment_score, sentiment_momentum (growing) |
| flight_prices.csv → price_usd per route/date (growing, for regression) |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from pathlib import Path |
| from datetime import datetime, timedelta |
| from src.utils.logger import get_logger |
| from src.utils.io_utils import load_csv_safe |
| from config.settings import BASE_DIR, DERIVED_DIR |
|
|
| logger = get_logger(__name__) |
|
|
| |
|
|
| SEVERITY_MAP = { |
| "Critical": 4, "Severe": 4, |
| "High": 3, |
| "Moderate": 2, "Medium": 2, |
| "Low": 1, "Minor": 1, |
| "Minimal": 0, |
| } |
|
|
| AVIATION_IMPACT_MAP = { |
| "Severe — major flight disruptions and airspace closures": 4, |
| "Severe": 4, |
| "High — significant flight cancellations and rerouting": 3, |
| "High": 3, |
| "Moderate — some delays and precautionary rerouting": 2, |
| "Moderate": 2, |
| "Low — minor disruptions": 1, |
| "Low": 1, |
| "Minimal — early warning signs only": 0, |
| "Minimal": 0, |
| "None": 0, |
| } |
|
|
| REGION_CONFLICT_WEIGHT = { |
| "Middle East": 1.5, |
| "Eastern Europe": 1.3, |
| "South Asia": 1.2, |
| "Central Asia": 1.1, |
| "Africa": 1.0, |
| "Western Europe": 0.5, |
| "North America": 0.4, |
| "Asia-Pacific": 0.6, |
| "Global": 0.8, |
| } |
|
|
| |
| |
| |
| |
| CITY_TO_IATA: dict = { |
| |
| "DUBAI": "DXB", "DOHA": "DOH", "ABU DHABI": "AUH", "RIYADH": "RUH", |
| "JEDDAH": "JED", "KUWAIT CITY": "KWI", "MUSCAT": "MCT", "BAHRAIN": "BAH", |
| "BEIRUT": "BEY", "AMMAN": "AMM", "TEL AVIV": "TLV", "TEHRAN": "THR", |
| "BAGHDAD": "BGW", "BASRA": "BSR", "DAMASCUS": "DAM", |
| |
| "KARACHI": "KHI", "ISLAMABAD": "ISB", "LAHORE": "LHE", "MUMBAI": "BOM", |
| "DELHI": "DEL", "BANGALORE": "BLR", "CHENNAI": "MAA", "COLOMBO": "CMB", |
| "DHAKA": "DAC", "KATHMANDU": "KTM", |
| |
| "LONDON": "LHR", "PARIS": "CDG", "FRANKFURT": "FRA", "AMSTERDAM": "AMS", |
| "ISTANBUL": "IST", "MOSCOW": "SVO", "ROME": "FCO", "MADRID": "MAD", |
| "ZURICH": "ZRH", "VIENNA": "VIE", "BRUSSELS": "BRU", "WARSAW": "WAW", |
| "ATHENS": "ATH", "BUDAPEST": "BUD", "BUCHAREST": "OTP", |
| |
| "CAIRO": "CAI", "CASABLANCA": "CMN", "TUNIS": "TUN", "ALGIERS": "ALG", |
| "TRIPOLI": "TIP", "KHARTOUM": "KRT", "ADDIS ABABA": "ADD", |
| |
| "SINGAPORE": "SIN", "BANGKOK": "BKK", "HONG KONG": "HKG", "TOKYO": "NRT", |
| "BEIJING": "PEK", "SHANGHAI": "PVG", "SEOUL": "ICN", "SYDNEY": "SYD", |
| "MELBOURNE": "MEL", "KUALA LUMPUR": "KUL", "JAKARTA": "CGK", |
| |
| "NEW YORK": "JFK", "LOS ANGELES": "LAX", "WASHINGTON": "IAD", |
| "CHICAGO": "ORD", "MIAMI": "MIA", "TORONTO": "YYZ", "SAO PAULO": "GRU", |
| |
| "NAIROBI": "NBO", "JOHANNESBURG": "JNB", "ACCRA": "ACC", |
| "OMDB": "DXB", |
| "OTHH": "DOH", |
| "OPKC": "KHI", |
| "OJAI": "AMM", |
| "LLBG": "TLV", |
| "OKBK": "KWI", |
| "OBBI": "BAH", |
| "OLBA": "BEY", |
| "OJAM": "AMM", |
| "OIII": "IKA", |
| "ORBI": "BGW", |
| "ORMM": "BSR", |
| "OSDI": "DAM", |
| "EGLL": "LHR", |
| "LFPG": "CDG", |
| "EDDF": "FRA", |
| "EHAM": "AMS", |
| "LTFM": "IST", |
| "UUEE": "SVO", |
| } |
|
|
|
|
| def normalise_iata(value: str) -> str: |
| """Convert city name or ICAO code to IATA code; pass through if already IATA.""" |
| v = str(value).strip().upper() |
| return CITY_TO_IATA.get(v, v) |
|
|
|
|
| |
| |
| |
| _CONFLICT_LOCATION_TO_IATA: dict = { |
| |
| "IRAN": ["THR", "IKA", "MHD"], |
| "TEHRAN": ["THR", "IKA"], |
| "IRAQ": ["BGW", "BSR", "NJF"], |
| "BAGHDAD": ["BGW"], |
| "BASRA": ["BSR"], |
| "SYRIA": ["DAM"], |
| "DAMASCUS": ["DAM"], |
| "LEBANON": ["BEY"], |
| "BEIRUT": ["BEY"], |
| "YEMEN": ["SAH", "ADE"], |
| "ISRAEL": ["TLV"], |
| "TEL AVIV": ["TLV"], |
| "GAZA": ["TLV"], |
| "JORDAN": ["AMM"], |
| "AMMAN": ["AMM"], |
| "PERSIAN GULF": ["DXB", "DOH", "AUH", "BAH", "KWI", "MCT"], |
| "GULF": ["DXB", "DOH", "AUH", "BAH", "KWI"], |
| "STRAIT OF HORMUZ": ["THR", "DXB", "DOH", "MCT"], |
| "RED SEA": ["SAH", "JED", "CAI"], |
| "SAUDI ARABIA": ["RUH", "JED", "DMM"], |
| "RIYADH": ["RUH"], |
| "JEDDAH": ["JED"], |
| "KUWAIT": ["KWI"], |
| "BAHRAIN": ["BAH"], |
| "OMAN": ["MCT"], |
| "UAE": ["DXB", "AUH", "SHJ"], |
| "DUBAI": ["DXB"], |
| "DOHA": ["DOH"], |
| "QATAR": ["DOH"], |
| |
| "UKRAINE": ["KBP", "HRK", "ODS"], |
| "KYIV": ["KBP"], |
| "KHARKIV": ["HRK"], |
| "ODESSA": ["ODS"], |
| "RUSSIA": ["SVO", "LED", "SVX"], |
| "MOSCOW": ["SVO"], |
| "BLACK SEA": ["KBP", "ODS"], |
| "CRIMEA": ["KBP", "ODS"], |
| |
| "PAKISTAN": ["KHI", "ISB", "LHE"], |
| "KARACHI": ["KHI"], |
| "ISLAMABAD": ["ISB"], |
| "LAHORE": ["LHE"], |
| "AFGHANISTAN": ["KBL"], |
| "KABUL": ["KBL"], |
| "KASHMIR": ["SXR", "ISB"], |
| "INDIA": ["DEL", "BOM", "MAA", "CCU"], |
| |
| "LIBYA": ["TIP", "BEN"], |
| "TRIPOLI": ["TIP"], |
| "SUDAN": ["KRT"], |
| "ETHIOPIA": ["ADD"], |
| "SOMALIA": ["MGQ"], |
| } |
|
|
|
|
| def get_conflict_zone_airports(lookback_days: int = 90, |
| min_severity: str = "Medium") -> frozenset: |
| """ |
| Build a dynamic set of conflict-zone IATA codes from recent high-severity |
| conflict events in data/base/conflict_events.csv. |
| |
| Parameters |
| ---------- |
| lookback_days : int |
| Only consider events within this many days of today (default 90). |
| min_severity : str |
| Minimum event severity to include ("Low", "Medium", "High", "Critical"). |
| |
| Returns |
| ------- |
| frozenset of IATA airport codes that are currently in active conflict zones. |
| Falls back to a hardcoded baseline set if the CSV is empty or missing. |
| """ |
| _SEVERITY_ORDER = {"Low": 1, "Minimal": 0, "Medium": 2, "Moderate": 2, |
| "High": 3, "Severe": 4, "Critical": 4} |
| min_sev_val = _SEVERITY_ORDER.get(min_severity, 2) |
|
|
| |
| _BASELINE = frozenset([ |
| "TLV", "AMM", "BEY", "BGW", "DAM", "THR", "IKA", |
| "KBP", "HRK", "ODS", |
| "KHI", "ISB", "LHE", "KBL", |
| "SAH", "TIP", "KRT", |
| ]) |
|
|
| conflict_df = load_csv_safe(BASE_DIR / "conflict_events.csv") |
| if conflict_df.empty: |
| logger.info("conflict_events.csv empty — using baseline conflict airports") |
| return _BASELINE |
|
|
| conflict_df["date"] = pd.to_datetime(conflict_df["date"], errors="coerce") |
| cutoff = pd.Timestamp.now() - pd.Timedelta(days=lookback_days) |
| recent = conflict_df[conflict_df["date"] >= cutoff].copy() |
|
|
| if recent.empty: |
| logger.info("No recent conflict events — using baseline conflict airports") |
| return _BASELINE |
|
|
| |
| recent["_sev_val"] = recent["severity"].map( |
| lambda s: _SEVERITY_ORDER.get(str(s).strip().title(), 0)) |
| recent = recent[recent["_sev_val"] >= min_sev_val] |
|
|
| if recent.empty: |
| return _BASELINE |
|
|
| |
| dynamic_airports: set = set() |
| for loc in recent["location"].dropna().str.upper(): |
| for keyword, iatas in _CONFLICT_LOCATION_TO_IATA.items(): |
| if keyword in loc: |
| dynamic_airports.update(iatas) |
|
|
| result = _BASELINE | frozenset(dynamic_airports) |
| logger.info( |
| "Dynamic conflict-zone airports (%d recent events): %d airports — %s", |
| len(recent), len(result), sorted(result), |
| ) |
| return result |
|
|
|
|
| |
|
|
| def load_flight_cancellations() -> pd.DataFrame: |
| """ |
| Load and normalise flight_cancellations.csv. |
| Adds: iata_code (from origin/destination), cancellation_flag=1. |
| """ |
| df = load_csv_safe(BASE_DIR / "flight_cancellations.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df["cancellation_flag"] = 1 |
| |
| df["iata_code"] = df["origin"].apply(normalise_iata) |
| df["country"] = df.get("origin_country", "Unknown") |
| return df |
|
|
|
|
| def load_airport_disruptions() -> pd.DataFrame: |
| """ |
| Load and normalise airport_disruptions.csv. |
| Encodes severity_level → numeric disruption_severity (0-4). |
| """ |
| df = load_csv_safe(BASE_DIR / "airport_disruptions.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df["disruption_severity"] = df["severity_level"].map(SEVERITY_MAP).fillna(2) |
| df["iata_code"] = df["iata_code"].str.strip().str.upper() |
| return df |
|
|
|
|
| def load_airspace_closures() -> pd.DataFrame: |
| """ |
| Load and normalise airspace_closures.csv. |
| Derives airspace_risk_score from duration + flights_affected. |
| Adds: date column from closure_start_date. |
| """ |
| df = load_csv_safe(BASE_DIR / "airspace_closures.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["closure_start_date"], errors="coerce") |
| df["closure_end"] = pd.to_datetime(df["closure_end_date"], errors="coerce") |
| |
| max_dur = df["duration_hours"].max() if "duration_hours" in df.columns else 168 |
| max_flt = df["flights_affected"].max() if "flights_affected" in df.columns else 500 |
| df["duration_hours"] = pd.to_numeric(df.get("duration_hours", 0), errors="coerce").fillna(0) |
| df["flights_affected"] = pd.to_numeric(df.get("flights_affected", 0), errors="coerce").fillna(0) |
| df["airspace_risk_score"] = ( |
| (df["duration_hours"] / (max_dur + 1)) * 2 + |
| (df["flights_affected"] / (max_flt + 1)) * 2 |
| ).clip(0, 4).round(2) |
| return df |
|
|
|
|
| def load_conflict_events() -> pd.DataFrame: |
| """ |
| Load and normalise conflict_events.csv. |
| Encodes severity + aviation_impact → numeric conflict_intensity. |
| """ |
| df = load_csv_safe(BASE_DIR / "conflict_events.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df["severity_num"] = df["severity"].map(SEVERITY_MAP).fillna(2) |
| df["aviation_impact_num"] = df["aviation_impact"].apply( |
| lambda x: next((v for k, v in AVIATION_IMPACT_MAP.items() |
| if str(k).lower() in str(x).lower()), 1) |
| ) |
| df["conflict_intensity"] = ( |
| (df["severity_num"] / 4) * 0.6 + (df["aviation_impact_num"] / 4) * 0.4 |
| ).round(4) |
| |
| df["region"] = df["location"].apply(_infer_region) |
| return df |
|
|
|
|
| def load_flight_reroutes() -> pd.DataFrame: |
| """ |
| Load and normalise flight_reroutes.csv. |
| Derives delay_hours, extra_fuel_cost per route/date. |
| """ |
| df = load_csv_safe(BASE_DIR / "flight_reroutes.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df["extra_fuel_cost_usd"] = pd.to_numeric(df.get("extra_fuel_cost_usd", 0), errors="coerce").fillna(0) |
| df["delay_hours"] = pd.to_numeric(df.get("delay_hours", 0), errors="coerce").fillna(0) |
| df["iata_code"] = df["origin"].apply(normalise_iata) |
| return df |
|
|
|
|
| def load_airline_losses() -> pd.DataFrame: |
| """Load airline_losses.csv for airline exposure scoring.""" |
| df = load_csv_safe(BASE_DIR / "airline_losses.csv") |
| if df.empty: |
| return df |
| df["estimated_loss_usd"] = pd.to_numeric(df.get("estimated_loss_usd", 0), errors="coerce").fillna(0) |
| max_loss = df["estimated_loss_usd"].max() |
| df["airline_exposure_score"] = (df["estimated_loss_usd"] / (max_loss + 1) * 100).round(2) |
| return df |
|
|
|
|
| def load_oil_prices() -> pd.DataFrame: |
| """Load oil_prices.csv from data/base/ (growing daily via yfinance).""" |
| df = load_csv_safe(BASE_DIR / "oil_prices.csv") |
| if df.empty: |
| logger.warning("data/base/oil_prices.csv not found — using fallback from data/derived/") |
| df = load_csv_safe(DERIVED_DIR / "oil_prices.csv") |
| if df.empty: |
| return df |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df = df.sort_values("date") |
| |
| price_col = next((c for c in ["brent_usd", "brent_price_usd", "oil_price"] if c in df.columns), None) |
| if price_col is None: |
| df["oil_price_change_pct"] = 0.0 |
| elif "oil_price_change_pct" in df.columns: |
| df["oil_price_change_pct"] = df["oil_price_change_pct"].fillna(0) |
| else: |
| df["oil_price_change_pct"] = df[price_col].pct_change().fillna(0) * 100 |
| |
| if "oil_price" not in df.columns and price_col: |
| df["oil_price"] = df[price_col] |
| return df |
|
|
|
|
| def load_sentiment() -> pd.DataFrame: |
| """Load sentiment.csv from data/base/ (growing via GDELT).""" |
| df = load_csv_safe(BASE_DIR / "sentiment.csv") |
| if df.empty: |
| logger.warning("data/base/sentiment.csv not found — using fallback from data/derived/") |
| df = load_csv_safe(DERIVED_DIR / "sentiment.csv") |
| if not df.empty: |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| return df |
|
|
|
|
| def load_flight_prices() -> pd.DataFrame: |
| """Load flight_prices.csv from data/base/ (growing via SerpApi).""" |
| df = load_csv_safe(BASE_DIR / "flight_prices.csv") |
| if not df.empty: |
| df["timestamp"] = pd.to_datetime(df.get("timestamp", df.get("date")), errors="coerce") |
| return df |
|
|
|
|
| |
|
|
| _REGION_KEYWORDS = { |
| "Middle East": ["iran", "iraq", "israel", "gaza", "yemen", "syria", |
| "uae", "dubai", "tehran", "beirut", "jordan", "saudi", |
| "bahrain", "qatar", "kuwait", "oman"], |
| "Eastern Europe": ["ukraine", "russia", "kyiv", "moscow", "poland", |
| "romania", "moldova", "belarus", "donbas"], |
| "South Asia": ["pakistan", "india", "afghanistan", "karachi", "delhi", |
| "kabul", "lahore"], |
| "Central Asia": ["kazakhstan", "uzbekistan", "tajikistan", "turkmenistan"], |
| "Africa": ["ethiopia", "sudan", "somalia", "libya", "mali", "niger", |
| "nigeria", "eritrea"], |
| "Western Europe": ["france", "germany", "uk", "london", "paris", |
| "brussels", "netherlands"], |
| "North America": ["usa", "united states", "canada", "mexico"], |
| "Asia-Pacific": ["china", "japan", "korea", "taiwan", "philippines", |
| "vietnam", "myanmar"], |
| } |
|
|
|
|
| def _infer_region(location: str) -> str: |
| loc = str(location).lower() |
| for region, keywords in _REGION_KEYWORDS.items(): |
| if any(kw in loc for kw in keywords): |
| return region |
| return "Global" |
|
|
|
|
| |
|
|
| def build_airport_daily_features() -> pd.DataFrame: |
| """ |
| Aggregate all base CSVs into a per-(airport, date) feature table. |
| This is the primary input for classification feature engineering. |
| |
| Returns columns: |
| date, iata_code, region, country, |
| cancellation_count, cancellation_rate, |
| disruption_severity, flights_affected, disruption_index_raw, |
| airspace_risk_score (from closures), |
| avg_delay_hours, extra_fuel_cost_sum, |
| conflict_event_count, conflict_intensity_max, |
| oil_price, oil_price_change_pct |
| """ |
| |
| cancel_df = load_flight_cancellations() |
| if not cancel_df.empty: |
| cancel_agg = ( |
| cancel_df.groupby(["date", "iata_code"]) |
| .agg( |
| cancellation_count=("cancellation_flag", "sum"), |
| passengers_affected=("passengers_affected", "sum"), |
| country=("country", "first"), |
| ) |
| .reset_index() |
| ) |
| else: |
| cancel_agg = pd.DataFrame(columns=["date", "iata_code", "cancellation_count", |
| "passengers_affected", "country"]) |
|
|
| |
| disrupt_df = load_airport_disruptions() |
| if not disrupt_df.empty: |
| disrupt_agg = ( |
| disrupt_df.groupby(["date", "iata_code"]) |
| .agg( |
| disruption_severity=("disruption_severity", "max"), |
| flights_affected=("flights_affected", "sum"), |
| duration_hours=("duration_hours", "max"), |
| region=("region", "first"), |
| country=("country", "first"), |
| ) |
| .reset_index() |
| ) |
| else: |
| disrupt_agg = pd.DataFrame(columns=["date", "iata_code", "disruption_severity", |
| "flights_affected", "duration_hours", |
| "region", "country"]) |
|
|
| |
| reroute_df = load_flight_reroutes() |
| if not reroute_df.empty: |
| reroute_agg = ( |
| reroute_df.groupby(["date", "iata_code"]) |
| .agg( |
| avg_delay_hours=("delay_hours", "mean"), |
| extra_fuel_cost_sum=("extra_fuel_cost_usd", "sum"), |
| reroute_count=("flight_number", "count"), |
| ) |
| .reset_index() |
| ) |
| else: |
| reroute_agg = pd.DataFrame(columns=["date", "iata_code", "avg_delay_hours", |
| "extra_fuel_cost_sum", "reroute_count"]) |
|
|
| |
| |
| |
| airspace_df = load_airspace_closures() |
| if not airspace_df.empty: |
| expanded_rows = [] |
| for _, row in airspace_df.iterrows(): |
| start = pd.to_datetime(row.get("date")).normalize() if pd.notna(row.get("date")) else None |
| end = pd.to_datetime(row.get("closure_end")).normalize() if pd.notna(row.get("closure_end")) else None |
| if start is None: |
| continue |
| if end is None or end < start: |
| dur = float(row.get("duration_hours", 24)) |
| end = (start + pd.Timedelta(hours=dur)).normalize() |
| for d in pd.date_range(start, end, freq="D"): |
| expanded_rows.append({ |
| "date": d, |
| "country": row.get("country", ""), |
| "airspace_risk_score": row.get("airspace_risk_score", 0), |
| }) |
| if expanded_rows: |
| exp_df = pd.DataFrame(expanded_rows) |
| airspace_agg = ( |
| exp_df.groupby(["date", "country"]) |
| .agg(airspace_risk_score=("airspace_risk_score", "max")) |
| .reset_index() |
| ) |
| else: |
| airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"]) |
| else: |
| airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"]) |
|
|
| |
| conflict_df = load_conflict_events() |
| if not conflict_df.empty: |
| conflict_agg = ( |
| conflict_df.groupby(["date", "region"]) |
| .agg( |
| conflict_event_count=("event_type", "count"), |
| conflict_intensity_max=("conflict_intensity", "max"), |
| ) |
| .reset_index() |
| ) |
| else: |
| conflict_agg = pd.DataFrame(columns=["date", "region", |
| "conflict_event_count", "conflict_intensity_max"]) |
|
|
| |
| |
| if not disrupt_agg.empty and not cancel_agg.empty: |
| merged = pd.merge(disrupt_agg, cancel_agg, |
| on=["date", "iata_code"], how="outer", |
| suffixes=("_d", "_c")) |
| |
| merged["country"] = merged["country_d"].combine_first(merged["country_c"]) |
| merged.drop(columns=["country_d", "country_c"], errors="ignore", inplace=True) |
| elif not disrupt_agg.empty: |
| merged = disrupt_agg.copy() |
| merged["cancellation_count"] = 0 |
| elif not cancel_agg.empty: |
| merged = cancel_agg.copy() |
| merged["disruption_severity"] = 0 |
| merged["flights_affected"] = 0 |
| merged["duration_hours"] = 0 |
| else: |
| logger.warning("No airport disruption or cancellation data available") |
| return pd.DataFrame() |
|
|
| |
| if not reroute_agg.empty: |
| merged = pd.merge(merged, reroute_agg, on=["date", "iata_code"], how="left") |
|
|
| |
| if "region" not in merged.columns or merged["region"].isna().any(): |
| merged["region"] = merged.get("region", pd.Series("Global", index=merged.index)) |
| country_region_map = ( |
| disrupt_df[["country", "region"]].drop_duplicates().set_index("country")["region"].to_dict() |
| if not disrupt_df.empty else {} |
| ) |
| merged["region"] = merged["region"].combine_first( |
| merged["country"].map(country_region_map) |
| ).fillna("Global") |
|
|
| |
| if not airspace_agg.empty: |
| merged["date"] = pd.to_datetime(merged["date"]).dt.normalize() |
| airspace_agg["date"] = pd.to_datetime(airspace_agg["date"]).dt.normalize() |
| merged = pd.merge(merged, airspace_agg, on=["date", "country"], how="left") |
|
|
| |
| if not conflict_agg.empty: |
| merged = pd.merge(merged, conflict_agg, on=["date", "region"], how="left") |
|
|
| |
| oil_df = load_oil_prices() |
| if not oil_df.empty: |
| oil_df["date"] = pd.to_datetime(oil_df["date"]) |
| merged["date"] = pd.to_datetime(merged["date"]) |
| |
| oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None) |
| if oil_price_col and "oil_price" not in oil_df.columns: |
| oil_df["oil_price"] = oil_df[oil_price_col] |
| oil_cols = ["date", "oil_price", "oil_price_change_pct"] |
| oil_cols = [c for c in oil_cols if c in oil_df.columns] |
| merged = pd.merge_asof( |
| merged.dropna(subset=["date"]).sort_values("date"), |
| oil_df[oil_cols].dropna(subset=["date"]).sort_values("date"), |
| on="date", |
| direction="nearest", |
| tolerance=pd.Timedelta("7d"), |
| ) |
|
|
| |
| num_defaults = { |
| "cancellation_count": 0, |
| "passengers_affected": 0, |
| "disruption_severity": 0, |
| "flights_affected": 0, |
| "duration_hours": 0, |
| "avg_delay_hours": 0, |
| "extra_fuel_cost_sum": 0, |
| "reroute_count": 0, |
| "airspace_risk_score": 0, |
| "conflict_event_count": 0, |
| "conflict_intensity_max": 0, |
| "oil_price": 85.0, |
| "oil_price_change_pct": 0.0, |
| } |
| for col, val in num_defaults.items(): |
| if col not in merged.columns: |
| merged[col] = val |
| else: |
| merged[col] = pd.to_numeric(merged[col], errors="coerce").fillna(val) |
|
|
| |
| max_flights = merged["flights_affected"].max() or 1 |
| merged["disruption_index"] = ( |
| (merged["disruption_severity"] / 4) * 40 + |
| (merged["avg_delay_hours"] / 24).clip(0, 1) * 30 + |
| (merged["airspace_risk_score"] / 4) * 20 + |
| (merged["cancellation_count"] / 50).clip(0, 1) * 10 |
| ).clip(0, 100).round(2) |
|
|
| |
| |
| total = merged["cancellation_count"] + merged["flights_affected"] + 1 |
| merged["cancellation_rate"] = (merged["cancellation_count"] / total).round(4) |
|
|
| |
| merged["region_weight"] = merged["region"].map(REGION_CONFLICT_WEIGHT).fillna(0.8) |
|
|
| |
| |
| |
| merged["is_high_disruption"] = (merged["disruption_index"] > 30).astype(int) |
|
|
| |
| merged["timestamp"] = ( |
| pd.to_datetime(merged["date"]).apply( |
| lambda d: d.strftime("%Y-%m-%dT12:00:00") if pd.notna(d) else None |
| ) |
| ) |
| merged["date_str"] = pd.to_datetime(merged["date"]).dt.strftime("%Y-%m-%d") |
|
|
| logger.info("Airport daily features: %d rows (from real base data)", len(merged)) |
| return merged |
|
|
|
|
| def build_sentiment_daily() -> pd.DataFrame: |
| """ |
| Aggregate sentiment.csv into daily regional averages. |
| Returns: date, region, sentiment_score, sentiment_momentum, article_count |
| """ |
| df = load_sentiment() |
| if df.empty: |
| return pd.DataFrame() |
|
|
| df["date"] = df["timestamp"].dt.date |
| agg_dict = { |
| "sentiment_score": ("sentiment_score", "mean"), |
| } |
| if "article_count" in df.columns: |
| agg_dict["article_count"] = ("article_count", "sum") |
| else: |
| agg_dict["article_count"] = ("sentiment_score", "count") |
|
|
| agg = df.groupby(["date", "region"]).agg(**agg_dict).reset_index() |
| agg["date"] = pd.to_datetime(agg["date"]) |
|
|
| |
| agg = agg.sort_values(["region", "date"]) |
| agg["sentiment_momentum"] = agg.groupby("region")["sentiment_score"].diff().fillna(0) |
| return agg |
|
|
|
|
| def build_classification_input() -> pd.DataFrame: |
| """ |
| Build the full ML-ready classification DataFrame from data/base/ CSVs. |
| Merges airport daily features with sentiment and computes final feature set. |
| |
| Returns a DataFrame with CLASSIFIER_FEATURES + CLASSIFIER_TARGET columns. |
| """ |
| from config.settings import CLASSIFIER_FEATURES, CLASSIFIER_TARGET |
|
|
| airport_df = build_airport_daily_features() |
| if airport_df.empty: |
| logger.error("No airport data available for classification input") |
| return pd.DataFrame() |
|
|
| |
| sentiment_df = build_sentiment_daily() |
| if not sentiment_df.empty: |
| airport_df["date_dt"] = pd.to_datetime(airport_df["date"]) |
| sentiment_df["date_dt"] = pd.to_datetime(sentiment_df["date"]) |
| airport_df = pd.merge_asof( |
| airport_df.sort_values("date_dt"), |
| sentiment_df[["date_dt", "region", "sentiment_score", "sentiment_momentum"]].sort_values("date_dt"), |
| on="date_dt", |
| by="region", |
| tolerance=pd.Timedelta("3d"), |
| direction="nearest", |
| suffixes=("", "_sent"), |
| ) |
| for col in ["sentiment_score", "sentiment_momentum"]: |
| if f"{col}_sent" in airport_df.columns: |
| airport_df[col] = airport_df[col].combine_first(airport_df[f"{col}_sent"]) |
| airport_df.drop(columns=[f"{col}_sent"], inplace=True, errors="ignore") |
|
|
| |
| rename_map = { |
| "airspace_risk_score": "airspace_risk_score", |
| "cancellation_rate": "cancellation_rate_24h", |
| "avg_delay_hours": "avg_delay_24h", |
| "oil_price_change_pct": "oil_price_change_pct", |
| "conflict_event_count": "conflict_event_count", |
| "disruption_index": "disruption_index_lag6h", |
| } |
| for old, new in rename_map.items(): |
| if old in airport_df.columns and new not in airport_df.columns: |
| airport_df.rename(columns={old: new}, inplace=True) |
|
|
| |
| for feat in CLASSIFIER_FEATURES: |
| if feat not in airport_df.columns: |
| airport_df[feat] = 0.0 |
| airport_df[feat] = pd.to_numeric(airport_df[feat], errors="coerce").fillna(0) |
|
|
| |
| if "airport_stress_score" not in airport_df.columns: |
| airport_df["airport_stress_score"] = ( |
| airport_df.get("disruption_index_lag6h", 0) * 0.5 + |
| airport_df.get("cancellation_rate_24h", 0) * 100 * 0.3 + |
| airport_df.get("airspace_risk_score", 0) / 4 * 100 * 0.2 |
| ).clip(0, 100).round(2) |
|
|
| result_cols = ( |
| ["timestamp", "date_str", "iata_code", "country", "region"] + |
| CLASSIFIER_FEATURES + [CLASSIFIER_TARGET] |
| ) |
| result_cols = [c for c in result_cols if c in airport_df.columns] |
| result = airport_df[result_cols].dropna(subset=[CLASSIFIER_TARGET]) |
|
|
| logger.info("Classification input: %d rows | positive rate: %.1f%% (from REAL base data)", |
| len(result), result[CLASSIFIER_TARGET].mean() * 100) |
| return result |
|
|
|
|
| def build_regression_input() -> pd.DataFrame: |
| """ |
| Build the ML-ready regression DataFrame for flight price prediction. |
| Source: data/base/flight_prices.csv (growing via SerpApi). |
| Falls back to data/derived/flight_prices.csv if base file is empty. |
| |
| Returns a DataFrame with REGRESSOR_FEATURES + REGRESSOR_TARGET columns. |
| """ |
| from config.settings import REGRESSOR_FEATURES, REGRESSOR_TARGET |
|
|
| prices_df = load_flight_prices() |
| if prices_df.empty: |
| logger.warning("data/base/flight_prices.csv is empty — " |
| "falling back to derived prices (synthetic). " |
| "Run SerpApi ingestion to populate real price data.") |
| prices_df = load_csv_safe(DERIVED_DIR / "flight_prices.csv") |
| if prices_df.empty: |
| return pd.DataFrame() |
|
|
| |
| oil_df = load_oil_prices() |
| if not oil_df.empty: |
| prices_df["date_dt"] = pd.to_datetime( |
| prices_df.get("timestamp", prices_df.get("date")), errors="coerce") |
| oil_df["date"] = pd.to_datetime(oil_df["date"]) |
| oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None) |
| if oil_price_col and "oil_price" not in oil_df.columns: |
| oil_df["oil_price"] = oil_df[oil_price_col] |
| oil_reg_cols = ["date", "oil_price", "oil_price_change_pct"] |
| oil_reg_cols = [c for c in oil_reg_cols if c in oil_df.columns] |
| prices_df = pd.merge_asof( |
| prices_df.sort_values("date_dt"), |
| oil_df[oil_reg_cols].sort_values("date"), |
| left_on="date_dt", right_on="date", |
| direction="nearest", |
| tolerance=pd.Timedelta("7d"), |
| ) |
| if "oil_price" not in prices_df.columns: |
| prices_df["oil_price"] = 85.0 |
|
|
| |
| airport_df = build_airport_daily_features() |
| if not airport_df.empty and "route" in prices_df.columns: |
| prices_df["origin"] = prices_df.get("origin", prices_df["route"].str.split("-").str[0]) |
| prices_df["date_dt2"] = pd.to_datetime(prices_df.get("timestamp", prices_df.get("date")), errors="coerce") |
| airport_df["date_dt2"] = pd.to_datetime(airport_df["date"]) |
| route_disrupt = ( |
| airport_df.groupby("date_dt2") |
| ["disruption_index"].mean().reset_index() |
| .rename(columns={"disruption_index": "disruption_index_route"}) |
| ) |
| prices_df = pd.merge_asof( |
| prices_df.sort_values("date_dt2"), |
| route_disrupt.sort_values("date_dt2"), |
| on="date_dt2", |
| direction="nearest", |
| tolerance=pd.Timedelta("7d"), |
| ) |
| if "disruption_index" not in prices_df.columns: |
| prices_df["disruption_index"] = prices_df.get("disruption_index_route", 0) |
|
|
| |
| |
| |
| try: |
| _conflict_airports = get_conflict_zone_airports(lookback_days=90, min_severity="Medium") |
| if _conflict_airports: |
| def _is_conflict_route(row) -> int: |
| orig = str(row.get("origin", "")).strip().upper() |
| dest = str(row.get("destination", "")).strip().upper() |
| return int(orig in _conflict_airports or dest in _conflict_airports) |
| prices_df["route_conflict_flag"] = prices_df.apply(_is_conflict_route, axis=1) |
| n_flagged = int(prices_df["route_conflict_flag"].sum()) |
| logger.info( |
| "Dynamic route_conflict_flag: %d / %d routes flagged as conflict-zone", |
| n_flagged, len(prices_df), |
| ) |
| except Exception as _rcf_err: |
| logger.warning("Dynamic route_conflict_flag failed (non-fatal): %s", _rcf_err) |
|
|
| |
| for feat in REGRESSOR_FEATURES: |
| if feat not in prices_df.columns: |
| prices_df[feat] = 0.0 |
| prices_df[feat] = pd.to_numeric(prices_df[feat], errors="coerce").fillna(0) |
|
|
| |
| target_col = prices_df.get(REGRESSOR_TARGET) |
| if REGRESSOR_TARGET not in prices_df.columns: |
| logger.error("Regression target '%s' not found", REGRESSOR_TARGET) |
| return pd.DataFrame() |
|
|
| result_cols = (["timestamp", "route", "origin", "destination"] + |
| REGRESSOR_FEATURES + [REGRESSOR_TARGET]) |
| result_cols = [c for c in result_cols if c in prices_df.columns] |
| result = prices_df[result_cols].dropna(subset=[REGRESSOR_TARGET]) |
|
|
| logger.info("Regression input: %d rows | price range $%.0f–$%.0f", |
| len(result), result[REGRESSOR_TARGET].min(), result[REGRESSOR_TARGET].max()) |
| return result |
|
|
|
|
| |
|
|
| def load_all_base_summary() -> dict: |
| """ |
| Return a dict of DataFrames for all base CSVs, for dashboard use. |
| Avoids re-loading in multiple dashboard tabs. |
| """ |
| return { |
| "flight_cancellations": load_flight_cancellations(), |
| "airport_disruptions": load_airport_disruptions(), |
| "airspace_closures": load_airspace_closures(), |
| "conflict_events": load_conflict_events(), |
| "flight_reroutes": load_flight_reroutes(), |
| "airline_losses": load_airline_losses(), |
| "oil_prices": load_oil_prices(), |
| "sentiment": load_sentiment(), |
| "flight_prices": load_flight_prices(), |
| } |
|
|
|
|
| if __name__ == "__main__": |
| print("=== Base Loader Self-Test ===\n") |
|
|
| print("Airport daily features:") |
| airport = build_airport_daily_features() |
| print(f" {len(airport)} rows") |
| if not airport.empty: |
| print(airport[["date", "iata_code", "region", "disruption_index", |
| "cancellation_count", "is_high_disruption"]].head(10).to_string(index=False)) |
|
|
| print("\nClassification input:") |
| clf = build_classification_input() |
| print(f" {len(clf)} rows | positive rate: {clf['is_high_disruption'].mean():.1%}" if not clf.empty else " empty") |
|
|
| print("\nRegression input:") |
| reg = build_regression_input() |
| print(f" {len(reg)} rows" if not reg.empty else " empty (no flight_prices data yet)") |
|
|