""" src/processing/base_loader.py ============================== SOURCE-OF-TRUTH READER for all data/base/ CSVs. This module is the single entry-point for feature engineering. It reads the real Kaggle historical CSVs + growing live-appended base files and produces ML-ready DataFrames for classification and regression. Architecture rule: - data/base/ → read here, appended by ingestion modules - data/derived/ → written here (regenerated each run) - data/raw/ → never read for features; audit trail only Base CSV → Feature mapping: flight_cancellations.csv → cancellation_rate, is_cancelled per airport/date airport_disruptions.csv → disruption_index, severity, flights_affected airspace_closures.csv → airspace_risk_score per country/date conflict_events.csv → conflict_event_count, conflict_intensity per region/date flight_reroutes.csv → avg_delay_hours, extra_fuel_cost per route/date airline_losses.csv → airline_exposure_score per airline oil_prices.csv → oil_price, oil_price_change_pct (growing) sentiment.csv → sentiment_score, sentiment_momentum (growing) flight_prices.csv → price_usd per route/date (growing, for regression) """ import pandas as pd import numpy as np from pathlib import Path from datetime import datetime, timedelta from src.utils.logger import get_logger from src.utils.io_utils import load_csv_safe from config.settings import BASE_DIR, DERIVED_DIR logger = get_logger(__name__) # ── Severity / Risk Encoding ────────────────────────────────────────────────── SEVERITY_MAP = { "Critical": 4, "Severe": 4, "High": 3, "Moderate": 2, "Medium": 2, "Low": 1, "Minor": 1, "Minimal": 0, } AVIATION_IMPACT_MAP = { "Severe — major flight disruptions and airspace closures": 4, "Severe": 4, "High — significant flight cancellations and rerouting": 3, "High": 3, "Moderate — some delays and precautionary rerouting": 2, "Moderate": 2, "Low — minor disruptions": 1, "Low": 1, "Minimal — early warning signs only": 0, "Minimal": 0, "None": 0, } REGION_CONFLICT_WEIGHT = { "Middle East": 1.5, "Eastern Europe": 1.3, "South Asia": 1.2, "Central Asia": 1.1, "Africa": 1.0, "Western Europe": 0.5, "North America": 0.4, "Asia-Pacific": 0.6, "Global": 0.8, } # ── City name → IATA code lookup ────────────────────────────────────────────── # flight_cancellations.csv stores city names in the 'origin' column. # This mapping normalises them to 3-letter IATA codes so they join correctly # with airport_disruptions.csv (which uses proper IATA codes). CITY_TO_IATA: dict = { # Middle East "DUBAI": "DXB", "DOHA": "DOH", "ABU DHABI": "AUH", "RIYADH": "RUH", "JEDDAH": "JED", "KUWAIT CITY": "KWI", "MUSCAT": "MCT", "BAHRAIN": "BAH", "BEIRUT": "BEY", "AMMAN": "AMM", "TEL AVIV": "TLV", "TEHRAN": "THR", "BAGHDAD": "BGW", "BASRA": "BSR", "DAMASCUS": "DAM", # South Asia "KARACHI": "KHI", "ISLAMABAD": "ISB", "LAHORE": "LHE", "MUMBAI": "BOM", "DELHI": "DEL", "BANGALORE": "BLR", "CHENNAI": "MAA", "COLOMBO": "CMB", "DHAKA": "DAC", "KATHMANDU": "KTM", # Europe "LONDON": "LHR", "PARIS": "CDG", "FRANKFURT": "FRA", "AMSTERDAM": "AMS", "ISTANBUL": "IST", "MOSCOW": "SVO", "ROME": "FCO", "MADRID": "MAD", "ZURICH": "ZRH", "VIENNA": "VIE", "BRUSSELS": "BRU", "WARSAW": "WAW", "ATHENS": "ATH", "BUDAPEST": "BUD", "BUCHAREST": "OTP", # North Africa "CAIRO": "CAI", "CASABLANCA": "CMN", "TUNIS": "TUN", "ALGIERS": "ALG", "TRIPOLI": "TIP", "KHARTOUM": "KRT", "ADDIS ABABA": "ADD", # Asia Pacific "SINGAPORE": "SIN", "BANGKOK": "BKK", "HONG KONG": "HKG", "TOKYO": "NRT", "BEIJING": "PEK", "SHANGHAI": "PVG", "SEOUL": "ICN", "SYDNEY": "SYD", "MELBOURNE": "MEL", "KUALA LUMPUR": "KUL", "JAKARTA": "CGK", # Americas "NEW YORK": "JFK", "LOS ANGELES": "LAX", "WASHINGTON": "IAD", "CHICAGO": "ORD", "MIAMI": "MIA", "TORONTO": "YYZ", "SAO PAULO": "GRU", # Other "NAIROBI": "NBO", "JOHANNESBURG": "JNB", "ACCRA": "ACC", "OMDB": "DXB", # ICAO codes sometimes appear in origin field "OTHH": "DOH", "OPKC": "KHI", "OJAI": "AMM", "LLBG": "TLV", "OKBK": "KWI", "OBBI": "BAH", "OLBA": "BEY", "OJAM": "AMM", "OIII": "IKA", "ORBI": "BGW", "ORMM": "BSR", "OSDI": "DAM", "EGLL": "LHR", "LFPG": "CDG", "EDDF": "FRA", "EHAM": "AMS", "LTFM": "IST", "UUEE": "SVO", } def normalise_iata(value: str) -> str: """Convert city name or ICAO code to IATA code; pass through if already IATA.""" v = str(value).strip().upper() return CITY_TO_IATA.get(v, v) # ── Conflict-location → IATA airport lookup ─────────────────────────────────── # Maps keywords found in conflict_events.csv 'location' column to nearby # major airports that would be affected by the conflict. _CONFLICT_LOCATION_TO_IATA: dict = { # Middle East "IRAN": ["THR", "IKA", "MHD"], "TEHRAN": ["THR", "IKA"], "IRAQ": ["BGW", "BSR", "NJF"], "BAGHDAD": ["BGW"], "BASRA": ["BSR"], "SYRIA": ["DAM"], "DAMASCUS": ["DAM"], "LEBANON": ["BEY"], "BEIRUT": ["BEY"], "YEMEN": ["SAH", "ADE"], "ISRAEL": ["TLV"], "TEL AVIV": ["TLV"], "GAZA": ["TLV"], "JORDAN": ["AMM"], "AMMAN": ["AMM"], "PERSIAN GULF": ["DXB", "DOH", "AUH", "BAH", "KWI", "MCT"], "GULF": ["DXB", "DOH", "AUH", "BAH", "KWI"], "STRAIT OF HORMUZ": ["THR", "DXB", "DOH", "MCT"], "RED SEA": ["SAH", "JED", "CAI"], "SAUDI ARABIA": ["RUH", "JED", "DMM"], "RIYADH": ["RUH"], "JEDDAH": ["JED"], "KUWAIT": ["KWI"], "BAHRAIN": ["BAH"], "OMAN": ["MCT"], "UAE": ["DXB", "AUH", "SHJ"], "DUBAI": ["DXB"], "DOHA": ["DOH"], "QATAR": ["DOH"], # Eastern Europe "UKRAINE": ["KBP", "HRK", "ODS"], "KYIV": ["KBP"], "KHARKIV": ["HRK"], "ODESSA": ["ODS"], "RUSSIA": ["SVO", "LED", "SVX"], "MOSCOW": ["SVO"], "BLACK SEA": ["KBP", "ODS"], "CRIMEA": ["KBP", "ODS"], # South Asia "PAKISTAN": ["KHI", "ISB", "LHE"], "KARACHI": ["KHI"], "ISLAMABAD": ["ISB"], "LAHORE": ["LHE"], "AFGHANISTAN": ["KBL"], "KABUL": ["KBL"], "KASHMIR": ["SXR", "ISB"], "INDIA": ["DEL", "BOM", "MAA", "CCU"], # North Africa "LIBYA": ["TIP", "BEN"], "TRIPOLI": ["TIP"], "SUDAN": ["KRT"], "ETHIOPIA": ["ADD"], "SOMALIA": ["MGQ"], } def get_conflict_zone_airports(lookback_days: int = 90, min_severity: str = "Medium") -> frozenset: """ Build a dynamic set of conflict-zone IATA codes from recent high-severity conflict events in data/base/conflict_events.csv. Parameters ---------- lookback_days : int Only consider events within this many days of today (default 90). min_severity : str Minimum event severity to include ("Low", "Medium", "High", "Critical"). Returns ------- frozenset of IATA airport codes that are currently in active conflict zones. Falls back to a hardcoded baseline set if the CSV is empty or missing. """ _SEVERITY_ORDER = {"Low": 1, "Minimal": 0, "Medium": 2, "Moderate": 2, "High": 3, "Severe": 4, "Critical": 4} min_sev_val = _SEVERITY_ORDER.get(min_severity, 2) # Always include this baseline set (conflict zones we know about a priori) _BASELINE = frozenset([ "TLV", "AMM", "BEY", "BGW", "DAM", "THR", "IKA", # Middle East "KBP", "HRK", "ODS", # Ukraine "KHI", "ISB", "LHE", "KBL", # South Asia "SAH", "TIP", "KRT", # Africa conflict ]) conflict_df = load_csv_safe(BASE_DIR / "conflict_events.csv") if conflict_df.empty: logger.info("conflict_events.csv empty — using baseline conflict airports") return _BASELINE conflict_df["date"] = pd.to_datetime(conflict_df["date"], errors="coerce") cutoff = pd.Timestamp.now() - pd.Timedelta(days=lookback_days) recent = conflict_df[conflict_df["date"] >= cutoff].copy() if recent.empty: logger.info("No recent conflict events — using baseline conflict airports") return _BASELINE # Filter by severity recent["_sev_val"] = recent["severity"].map( lambda s: _SEVERITY_ORDER.get(str(s).strip().title(), 0)) recent = recent[recent["_sev_val"] >= min_sev_val] if recent.empty: return _BASELINE # Extract IATA codes from location strings dynamic_airports: set = set() for loc in recent["location"].dropna().str.upper(): for keyword, iatas in _CONFLICT_LOCATION_TO_IATA.items(): if keyword in loc: dynamic_airports.update(iatas) result = _BASELINE | frozenset(dynamic_airports) logger.info( "Dynamic conflict-zone airports (%d recent events): %d airports — %s", len(recent), len(result), sorted(result), ) return result # ── Loaders for each base CSV ───────────────────────────────────────────────── def load_flight_cancellations() -> pd.DataFrame: """ Load and normalise flight_cancellations.csv. Adds: iata_code (from origin/destination), cancellation_flag=1. """ df = load_csv_safe(BASE_DIR / "flight_cancellations.csv") if df.empty: return df df["date"] = pd.to_datetime(df["date"], errors="coerce") df["cancellation_flag"] = 1 # Use origin as the airport reference — normalise city names to IATA codes df["iata_code"] = df["origin"].apply(normalise_iata) df["country"] = df.get("origin_country", "Unknown") return df def load_airport_disruptions() -> pd.DataFrame: """ Load and normalise airport_disruptions.csv. Encodes severity_level → numeric disruption_severity (0-4). """ df = load_csv_safe(BASE_DIR / "airport_disruptions.csv") if df.empty: return df df["date"] = pd.to_datetime(df["date"], errors="coerce") df["disruption_severity"] = df["severity_level"].map(SEVERITY_MAP).fillna(2) df["iata_code"] = df["iata_code"].str.strip().str.upper() return df def load_airspace_closures() -> pd.DataFrame: """ Load and normalise airspace_closures.csv. Derives airspace_risk_score from duration + flights_affected. Adds: date column from closure_start_date. """ df = load_csv_safe(BASE_DIR / "airspace_closures.csv") if df.empty: return df df["date"] = pd.to_datetime(df["closure_start_date"], errors="coerce") df["closure_end"] = pd.to_datetime(df["closure_end_date"], errors="coerce") # Airspace risk score 0-4: based on duration and flights affected max_dur = df["duration_hours"].max() if "duration_hours" in df.columns else 168 max_flt = df["flights_affected"].max() if "flights_affected" in df.columns else 500 df["duration_hours"] = pd.to_numeric(df.get("duration_hours", 0), errors="coerce").fillna(0) df["flights_affected"] = pd.to_numeric(df.get("flights_affected", 0), errors="coerce").fillna(0) df["airspace_risk_score"] = ( (df["duration_hours"] / (max_dur + 1)) * 2 + (df["flights_affected"] / (max_flt + 1)) * 2 ).clip(0, 4).round(2) return df def load_conflict_events() -> pd.DataFrame: """ Load and normalise conflict_events.csv. Encodes severity + aviation_impact → numeric conflict_intensity. """ df = load_csv_safe(BASE_DIR / "conflict_events.csv") if df.empty: return df df["date"] = pd.to_datetime(df["date"], errors="coerce") df["severity_num"] = df["severity"].map(SEVERITY_MAP).fillna(2) df["aviation_impact_num"] = df["aviation_impact"].apply( lambda x: next((v for k, v in AVIATION_IMPACT_MAP.items() if str(k).lower() in str(x).lower()), 1) ) df["conflict_intensity"] = ( (df["severity_num"] / 4) * 0.6 + (df["aviation_impact_num"] / 4) * 0.4 ).round(4) # Extract region from location df["region"] = df["location"].apply(_infer_region) return df def load_flight_reroutes() -> pd.DataFrame: """ Load and normalise flight_reroutes.csv. Derives delay_hours, extra_fuel_cost per route/date. """ df = load_csv_safe(BASE_DIR / "flight_reroutes.csv") if df.empty: return df df["date"] = pd.to_datetime(df["date"], errors="coerce") df["extra_fuel_cost_usd"] = pd.to_numeric(df.get("extra_fuel_cost_usd", 0), errors="coerce").fillna(0) df["delay_hours"] = pd.to_numeric(df.get("delay_hours", 0), errors="coerce").fillna(0) df["iata_code"] = df["origin"].apply(normalise_iata) return df def load_airline_losses() -> pd.DataFrame: """Load airline_losses.csv for airline exposure scoring.""" df = load_csv_safe(BASE_DIR / "airline_losses.csv") if df.empty: return df df["estimated_loss_usd"] = pd.to_numeric(df.get("estimated_loss_usd", 0), errors="coerce").fillna(0) max_loss = df["estimated_loss_usd"].max() df["airline_exposure_score"] = (df["estimated_loss_usd"] / (max_loss + 1) * 100).round(2) return df def load_oil_prices() -> pd.DataFrame: """Load oil_prices.csv from data/base/ (growing daily via yfinance).""" df = load_csv_safe(BASE_DIR / "oil_prices.csv") if df.empty: logger.warning("data/base/oil_prices.csv not found — using fallback from data/derived/") df = load_csv_safe(DERIVED_DIR / "oil_prices.csv") if df.empty: return df df["date"] = pd.to_datetime(df["date"], errors="coerce") df = df.sort_values("date") # support both column naming conventions price_col = next((c for c in ["brent_usd", "brent_price_usd", "oil_price"] if c in df.columns), None) if price_col is None: df["oil_price_change_pct"] = 0.0 elif "oil_price_change_pct" in df.columns: df["oil_price_change_pct"] = df["oil_price_change_pct"].fillna(0) else: df["oil_price_change_pct"] = df[price_col].pct_change().fillna(0) * 100 # normalise to canonical "oil_price" column if "oil_price" not in df.columns and price_col: df["oil_price"] = df[price_col] return df def load_sentiment() -> pd.DataFrame: """Load sentiment.csv from data/base/ (growing via GDELT).""" df = load_csv_safe(BASE_DIR / "sentiment.csv") if df.empty: logger.warning("data/base/sentiment.csv not found — using fallback from data/derived/") df = load_csv_safe(DERIVED_DIR / "sentiment.csv") if not df.empty: df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") return df def load_flight_prices() -> pd.DataFrame: """Load flight_prices.csv from data/base/ (growing via SerpApi).""" df = load_csv_safe(BASE_DIR / "flight_prices.csv") if not df.empty: df["timestamp"] = pd.to_datetime(df.get("timestamp", df.get("date")), errors="coerce") return df # ── Region inference helper ─────────────────────────────────────────────────── _REGION_KEYWORDS = { "Middle East": ["iran", "iraq", "israel", "gaza", "yemen", "syria", "uae", "dubai", "tehran", "beirut", "jordan", "saudi", "bahrain", "qatar", "kuwait", "oman"], "Eastern Europe": ["ukraine", "russia", "kyiv", "moscow", "poland", "romania", "moldova", "belarus", "donbas"], "South Asia": ["pakistan", "india", "afghanistan", "karachi", "delhi", "kabul", "lahore"], "Central Asia": ["kazakhstan", "uzbekistan", "tajikistan", "turkmenistan"], "Africa": ["ethiopia", "sudan", "somalia", "libya", "mali", "niger", "nigeria", "eritrea"], "Western Europe": ["france", "germany", "uk", "london", "paris", "brussels", "netherlands"], "North America": ["usa", "united states", "canada", "mexico"], "Asia-Pacific": ["china", "japan", "korea", "taiwan", "philippines", "vietnam", "myanmar"], } def _infer_region(location: str) -> str: loc = str(location).lower() for region, keywords in _REGION_KEYWORDS.items(): if any(kw in loc for kw in keywords): return region return "Global" # ── Aggregation builders ────────────────────────────────────────────────────── def build_airport_daily_features() -> pd.DataFrame: """ Aggregate all base CSVs into a per-(airport, date) feature table. This is the primary input for classification feature engineering. Returns columns: date, iata_code, region, country, cancellation_count, cancellation_rate, disruption_severity, flights_affected, disruption_index_raw, airspace_risk_score (from closures), avg_delay_hours, extra_fuel_cost_sum, conflict_event_count, conflict_intensity_max, oil_price, oil_price_change_pct """ # 1. Cancellations per airport per day cancel_df = load_flight_cancellations() if not cancel_df.empty: cancel_agg = ( cancel_df.groupby(["date", "iata_code"]) .agg( cancellation_count=("cancellation_flag", "sum"), passengers_affected=("passengers_affected", "sum"), country=("country", "first"), ) .reset_index() ) else: cancel_agg = pd.DataFrame(columns=["date", "iata_code", "cancellation_count", "passengers_affected", "country"]) # 2. Airport disruption severity per airport per day disrupt_df = load_airport_disruptions() if not disrupt_df.empty: disrupt_agg = ( disrupt_df.groupby(["date", "iata_code"]) .agg( disruption_severity=("disruption_severity", "max"), flights_affected=("flights_affected", "sum"), duration_hours=("duration_hours", "max"), region=("region", "first"), country=("country", "first"), ) .reset_index() ) else: disrupt_agg = pd.DataFrame(columns=["date", "iata_code", "disruption_severity", "flights_affected", "duration_hours", "region", "country"]) # 3. Reroutes (delays) per airport per day reroute_df = load_flight_reroutes() if not reroute_df.empty: reroute_agg = ( reroute_df.groupby(["date", "iata_code"]) .agg( avg_delay_hours=("delay_hours", "mean"), extra_fuel_cost_sum=("extra_fuel_cost_usd", "sum"), reroute_count=("flight_number", "count"), ) .reset_index() ) else: reroute_agg = pd.DataFrame(columns=["date", "iata_code", "avg_delay_hours", "extra_fuel_cost_sum", "reroute_count"]) # 4. Airspace risk per country per day (broadcast to airports by country) # FIX: closures span multiple days; expand each row across its full date range # so that the date+country join actually hits airport_disruption records. airspace_df = load_airspace_closures() if not airspace_df.empty: expanded_rows = [] for _, row in airspace_df.iterrows(): start = pd.to_datetime(row.get("date")).normalize() if pd.notna(row.get("date")) else None end = pd.to_datetime(row.get("closure_end")).normalize() if pd.notna(row.get("closure_end")) else None if start is None: continue if end is None or end < start: dur = float(row.get("duration_hours", 24)) end = (start + pd.Timedelta(hours=dur)).normalize() for d in pd.date_range(start, end, freq="D"): expanded_rows.append({ "date": d, "country": row.get("country", ""), "airspace_risk_score": row.get("airspace_risk_score", 0), }) if expanded_rows: exp_df = pd.DataFrame(expanded_rows) airspace_agg = ( exp_df.groupby(["date", "country"]) .agg(airspace_risk_score=("airspace_risk_score", "max")) .reset_index() ) else: airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"]) else: airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"]) # 5. Conflict events per region per day conflict_df = load_conflict_events() if not conflict_df.empty: conflict_agg = ( conflict_df.groupby(["date", "region"]) .agg( conflict_event_count=("event_type", "count"), conflict_intensity_max=("conflict_intensity", "max"), ) .reset_index() ) else: conflict_agg = pd.DataFrame(columns=["date", "region", "conflict_event_count", "conflict_intensity_max"]) # 6. Merge all on (date, iata_code) # Start from disruption (has most info), outer-join with cancellations if not disrupt_agg.empty and not cancel_agg.empty: merged = pd.merge(disrupt_agg, cancel_agg, on=["date", "iata_code"], how="outer", suffixes=("_d", "_c")) # Coalesce country/region merged["country"] = merged["country_d"].combine_first(merged["country_c"]) merged.drop(columns=["country_d", "country_c"], errors="ignore", inplace=True) elif not disrupt_agg.empty: merged = disrupt_agg.copy() merged["cancellation_count"] = 0 elif not cancel_agg.empty: merged = cancel_agg.copy() merged["disruption_severity"] = 0 merged["flights_affected"] = 0 merged["duration_hours"] = 0 else: logger.warning("No airport disruption or cancellation data available") return pd.DataFrame() # Add reroutes if not reroute_agg.empty: merged = pd.merge(merged, reroute_agg, on=["date", "iata_code"], how="left") # 7. Infer region from country if missing if "region" not in merged.columns or merged["region"].isna().any(): merged["region"] = merged.get("region", pd.Series("Global", index=merged.index)) country_region_map = ( disrupt_df[["country", "region"]].drop_duplicates().set_index("country")["region"].to_dict() if not disrupt_df.empty else {} ) merged["region"] = merged["region"].combine_first( merged["country"].map(country_region_map) ).fillna("Global") # 8. Join airspace risk by country + date if not airspace_agg.empty: merged["date"] = pd.to_datetime(merged["date"]).dt.normalize() airspace_agg["date"] = pd.to_datetime(airspace_agg["date"]).dt.normalize() merged = pd.merge(merged, airspace_agg, on=["date", "country"], how="left") # 9. Join conflict events by region + date if not conflict_agg.empty: merged = pd.merge(merged, conflict_agg, on=["date", "region"], how="left") # 10. Join oil prices by date oil_df = load_oil_prices() if not oil_df.empty: oil_df["date"] = pd.to_datetime(oil_df["date"]) merged["date"] = pd.to_datetime(merged["date"]) # ensure canonical oil_price column exists (handles brent_usd or brent_price_usd) oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None) if oil_price_col and "oil_price" not in oil_df.columns: oil_df["oil_price"] = oil_df[oil_price_col] oil_cols = ["date", "oil_price", "oil_price_change_pct"] oil_cols = [c for c in oil_cols if c in oil_df.columns] merged = pd.merge_asof( merged.dropna(subset=["date"]).sort_values("date"), oil_df[oil_cols].dropna(subset=["date"]).sort_values("date"), on="date", direction="nearest", tolerance=pd.Timedelta("7d"), ) # 11. Fill numeric defaults num_defaults = { "cancellation_count": 0, "passengers_affected": 0, "disruption_severity": 0, "flights_affected": 0, "duration_hours": 0, "avg_delay_hours": 0, "extra_fuel_cost_sum": 0, "reroute_count": 0, "airspace_risk_score": 0, "conflict_event_count": 0, "conflict_intensity_max": 0, "oil_price": 85.0, "oil_price_change_pct": 0.0, } for col, val in num_defaults.items(): if col not in merged.columns: merged[col] = val else: merged[col] = pd.to_numeric(merged[col], errors="coerce").fillna(val) # 12. Compute disruption_index from base data max_flights = merged["flights_affected"].max() or 1 merged["disruption_index"] = ( (merged["disruption_severity"] / 4) * 40 + (merged["avg_delay_hours"] / 24).clip(0, 1) * 30 + (merged["airspace_risk_score"] / 4) * 20 + (merged["cancellation_count"] / 50).clip(0, 1) * 10 ).clip(0, 100).round(2) # 13. Compute cancellation_rate (per airport per day, as fraction of typical capacity) # Proxy: cancellation_count / (cancellation_count + flights_affected + 1) total = merged["cancellation_count"] + merged["flights_affected"] + 1 merged["cancellation_rate"] = (merged["cancellation_count"] / total).round(4) # 14. Regional conflict weight merged["region_weight"] = merged["region"].map(REGION_CONFLICT_WEIGHT).fillna(0.8) # 15. Binary disruption target # Threshold 30/100 reflects "high disruption" given available base data; # with full feature stack (oil + airspace) the index will reach higher values merged["is_high_disruption"] = (merged["disruption_index"] > 30).astype(int) # 16. Add timestamp column (noon on each date) merged["timestamp"] = ( pd.to_datetime(merged["date"]).apply( lambda d: d.strftime("%Y-%m-%dT12:00:00") if pd.notna(d) else None ) ) merged["date_str"] = pd.to_datetime(merged["date"]).dt.strftime("%Y-%m-%d") logger.info("Airport daily features: %d rows (from real base data)", len(merged)) return merged def build_sentiment_daily() -> pd.DataFrame: """ Aggregate sentiment.csv into daily regional averages. Returns: date, region, sentiment_score, sentiment_momentum, article_count """ df = load_sentiment() if df.empty: return pd.DataFrame() df["date"] = df["timestamp"].dt.date agg_dict = { "sentiment_score": ("sentiment_score", "mean"), } if "article_count" in df.columns: agg_dict["article_count"] = ("article_count", "sum") else: agg_dict["article_count"] = ("sentiment_score", "count") agg = df.groupby(["date", "region"]).agg(**agg_dict).reset_index() agg["date"] = pd.to_datetime(agg["date"]) # Compute sentiment_momentum as rolling 12h (1-day) change per region agg = agg.sort_values(["region", "date"]) agg["sentiment_momentum"] = agg.groupby("region")["sentiment_score"].diff().fillna(0) return agg def build_classification_input() -> pd.DataFrame: """ Build the full ML-ready classification DataFrame from data/base/ CSVs. Merges airport daily features with sentiment and computes final feature set. Returns a DataFrame with CLASSIFIER_FEATURES + CLASSIFIER_TARGET columns. """ from config.settings import CLASSIFIER_FEATURES, CLASSIFIER_TARGET airport_df = build_airport_daily_features() if airport_df.empty: logger.error("No airport data available for classification input") return pd.DataFrame() # Merge sentiment by region + date sentiment_df = build_sentiment_daily() if not sentiment_df.empty: airport_df["date_dt"] = pd.to_datetime(airport_df["date"]) sentiment_df["date_dt"] = pd.to_datetime(sentiment_df["date"]) airport_df = pd.merge_asof( airport_df.sort_values("date_dt"), sentiment_df[["date_dt", "region", "sentiment_score", "sentiment_momentum"]].sort_values("date_dt"), on="date_dt", by="region", tolerance=pd.Timedelta("3d"), direction="nearest", suffixes=("", "_sent"), ) for col in ["sentiment_score", "sentiment_momentum"]: if f"{col}_sent" in airport_df.columns: airport_df[col] = airport_df[col].combine_first(airport_df[f"{col}_sent"]) airport_df.drop(columns=[f"{col}_sent"], inplace=True, errors="ignore") # Rename to match CLASSIFIER_FEATURES rename_map = { "airspace_risk_score": "airspace_risk_score", "cancellation_rate": "cancellation_rate_24h", "avg_delay_hours": "avg_delay_24h", "oil_price_change_pct": "oil_price_change_pct", "conflict_event_count": "conflict_event_count", "disruption_index": "disruption_index_lag6h", } for old, new in rename_map.items(): if old in airport_df.columns and new not in airport_df.columns: airport_df.rename(columns={old: new}, inplace=True) # Fill all required features with defaults for feat in CLASSIFIER_FEATURES: if feat not in airport_df.columns: airport_df[feat] = 0.0 airport_df[feat] = pd.to_numeric(airport_df[feat], errors="coerce").fillna(0) # Compute airport_stress_score if not present if "airport_stress_score" not in airport_df.columns: airport_df["airport_stress_score"] = ( airport_df.get("disruption_index_lag6h", 0) * 0.5 + airport_df.get("cancellation_rate_24h", 0) * 100 * 0.3 + airport_df.get("airspace_risk_score", 0) / 4 * 100 * 0.2 ).clip(0, 100).round(2) result_cols = ( ["timestamp", "date_str", "iata_code", "country", "region"] + CLASSIFIER_FEATURES + [CLASSIFIER_TARGET] ) result_cols = [c for c in result_cols if c in airport_df.columns] result = airport_df[result_cols].dropna(subset=[CLASSIFIER_TARGET]) logger.info("Classification input: %d rows | positive rate: %.1f%% (from REAL base data)", len(result), result[CLASSIFIER_TARGET].mean() * 100) return result def build_regression_input() -> pd.DataFrame: """ Build the ML-ready regression DataFrame for flight price prediction. Source: data/base/flight_prices.csv (growing via SerpApi). Falls back to data/derived/flight_prices.csv if base file is empty. Returns a DataFrame with REGRESSOR_FEATURES + REGRESSOR_TARGET columns. """ from config.settings import REGRESSOR_FEATURES, REGRESSOR_TARGET prices_df = load_flight_prices() if prices_df.empty: logger.warning("data/base/flight_prices.csv is empty — " "falling back to derived prices (synthetic). " "Run SerpApi ingestion to populate real price data.") prices_df = load_csv_safe(DERIVED_DIR / "flight_prices.csv") if prices_df.empty: return pd.DataFrame() # Merge oil prices by date oil_df = load_oil_prices() if not oil_df.empty: prices_df["date_dt"] = pd.to_datetime( prices_df.get("timestamp", prices_df.get("date")), errors="coerce") oil_df["date"] = pd.to_datetime(oil_df["date"]) oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None) if oil_price_col and "oil_price" not in oil_df.columns: oil_df["oil_price"] = oil_df[oil_price_col] oil_reg_cols = ["date", "oil_price", "oil_price_change_pct"] oil_reg_cols = [c for c in oil_reg_cols if c in oil_df.columns] prices_df = pd.merge_asof( prices_df.sort_values("date_dt"), oil_df[oil_reg_cols].sort_values("date"), left_on="date_dt", right_on="date", direction="nearest", tolerance=pd.Timedelta("7d"), ) if "oil_price" not in prices_df.columns: prices_df["oil_price"] = 85.0 # Merge disruption index by route/date (airport-level) airport_df = build_airport_daily_features() if not airport_df.empty and "route" in prices_df.columns: prices_df["origin"] = prices_df.get("origin", prices_df["route"].str.split("-").str[0]) prices_df["date_dt2"] = pd.to_datetime(prices_df.get("timestamp", prices_df.get("date")), errors="coerce") airport_df["date_dt2"] = pd.to_datetime(airport_df["date"]) route_disrupt = ( airport_df.groupby("date_dt2") ["disruption_index"].mean().reset_index() .rename(columns={"disruption_index": "disruption_index_route"}) ) prices_df = pd.merge_asof( prices_df.sort_values("date_dt2"), route_disrupt.sort_values("date_dt2"), on="date_dt2", direction="nearest", tolerance=pd.Timedelta("7d"), ) if "disruption_index" not in prices_df.columns: prices_df["disruption_index"] = prices_df.get("disruption_index_route", 0) # ── Dynamic route_conflict_flag (replaces static ingestion-time flag) ──────── # Recompute at pipeline run-time so the flag always reflects current # conflict data rather than the hardcoded set baked in at ingestion. try: _conflict_airports = get_conflict_zone_airports(lookback_days=90, min_severity="Medium") if _conflict_airports: def _is_conflict_route(row) -> int: orig = str(row.get("origin", "")).strip().upper() dest = str(row.get("destination", "")).strip().upper() return int(orig in _conflict_airports or dest in _conflict_airports) prices_df["route_conflict_flag"] = prices_df.apply(_is_conflict_route, axis=1) n_flagged = int(prices_df["route_conflict_flag"].sum()) logger.info( "Dynamic route_conflict_flag: %d / %d routes flagged as conflict-zone", n_flagged, len(prices_df), ) except Exception as _rcf_err: logger.warning("Dynamic route_conflict_flag failed (non-fatal): %s", _rcf_err) # Fill required regression features for feat in REGRESSOR_FEATURES: if feat not in prices_df.columns: prices_df[feat] = 0.0 prices_df[feat] = pd.to_numeric(prices_df[feat], errors="coerce").fillna(0) # Ensure target target_col = prices_df.get(REGRESSOR_TARGET) if REGRESSOR_TARGET not in prices_df.columns: logger.error("Regression target '%s' not found", REGRESSOR_TARGET) return pd.DataFrame() result_cols = (["timestamp", "route", "origin", "destination"] + REGRESSOR_FEATURES + [REGRESSOR_TARGET]) result_cols = [c for c in result_cols if c in prices_df.columns] result = prices_df[result_cols].dropna(subset=[REGRESSOR_TARGET]) logger.info("Regression input: %d rows | price range $%.0f–$%.0f", len(result), result[REGRESSOR_TARGET].min(), result[REGRESSOR_TARGET].max()) return result # ── Convenience summary loader for dashboard ────────────────────────────────── def load_all_base_summary() -> dict: """ Return a dict of DataFrames for all base CSVs, for dashboard use. Avoids re-loading in multiple dashboard tabs. """ return { "flight_cancellations": load_flight_cancellations(), "airport_disruptions": load_airport_disruptions(), "airspace_closures": load_airspace_closures(), "conflict_events": load_conflict_events(), "flight_reroutes": load_flight_reroutes(), "airline_losses": load_airline_losses(), "oil_prices": load_oil_prices(), "sentiment": load_sentiment(), "flight_prices": load_flight_prices(), } if __name__ == "__main__": print("=== Base Loader Self-Test ===\n") print("Airport daily features:") airport = build_airport_daily_features() print(f" {len(airport)} rows") if not airport.empty: print(airport[["date", "iata_code", "region", "disruption_index", "cancellation_count", "is_high_disruption"]].head(10).to_string(index=False)) print("\nClassification input:") clf = build_classification_input() print(f" {len(clf)} rows | positive rate: {clf['is_high_disruption'].mean():.1%}" if not clf.empty else " empty") print("\nRegression input:") reg = build_regression_input() print(f" {len(reg)} rows" if not reg.empty else " empty (no flight_prices data yet)")