bhanug2026
Initial commit
47c6cfd
"""
src/processing/base_loader.py
==============================
SOURCE-OF-TRUTH READER for all data/base/ CSVs.
This module is the single entry-point for feature engineering.
It reads the real Kaggle historical CSVs + growing live-appended base files
and produces ML-ready DataFrames for classification and regression.
Architecture rule:
- data/base/ → read here, appended by ingestion modules
- data/derived/ → written here (regenerated each run)
- data/raw/ → never read for features; audit trail only
Base CSV → Feature mapping:
flight_cancellations.csv → cancellation_rate, is_cancelled per airport/date
airport_disruptions.csv → disruption_index, severity, flights_affected
airspace_closures.csv → airspace_risk_score per country/date
conflict_events.csv → conflict_event_count, conflict_intensity per region/date
flight_reroutes.csv → avg_delay_hours, extra_fuel_cost per route/date
airline_losses.csv → airline_exposure_score per airline
oil_prices.csv → oil_price, oil_price_change_pct (growing)
sentiment.csv → sentiment_score, sentiment_momentum (growing)
flight_prices.csv → price_usd per route/date (growing, for regression)
"""
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from src.utils.logger import get_logger
from src.utils.io_utils import load_csv_safe
from config.settings import BASE_DIR, DERIVED_DIR
logger = get_logger(__name__)
# ── Severity / Risk Encoding ──────────────────────────────────────────────────
SEVERITY_MAP = {
"Critical": 4, "Severe": 4,
"High": 3,
"Moderate": 2, "Medium": 2,
"Low": 1, "Minor": 1,
"Minimal": 0,
}
AVIATION_IMPACT_MAP = {
"Severe — major flight disruptions and airspace closures": 4,
"Severe": 4,
"High — significant flight cancellations and rerouting": 3,
"High": 3,
"Moderate — some delays and precautionary rerouting": 2,
"Moderate": 2,
"Low — minor disruptions": 1,
"Low": 1,
"Minimal — early warning signs only": 0,
"Minimal": 0,
"None": 0,
}
REGION_CONFLICT_WEIGHT = {
"Middle East": 1.5,
"Eastern Europe": 1.3,
"South Asia": 1.2,
"Central Asia": 1.1,
"Africa": 1.0,
"Western Europe": 0.5,
"North America": 0.4,
"Asia-Pacific": 0.6,
"Global": 0.8,
}
# ── City name → IATA code lookup ──────────────────────────────────────────────
# flight_cancellations.csv stores city names in the 'origin' column.
# This mapping normalises them to 3-letter IATA codes so they join correctly
# with airport_disruptions.csv (which uses proper IATA codes).
CITY_TO_IATA: dict = {
# Middle East
"DUBAI": "DXB", "DOHA": "DOH", "ABU DHABI": "AUH", "RIYADH": "RUH",
"JEDDAH": "JED", "KUWAIT CITY": "KWI", "MUSCAT": "MCT", "BAHRAIN": "BAH",
"BEIRUT": "BEY", "AMMAN": "AMM", "TEL AVIV": "TLV", "TEHRAN": "THR",
"BAGHDAD": "BGW", "BASRA": "BSR", "DAMASCUS": "DAM",
# South Asia
"KARACHI": "KHI", "ISLAMABAD": "ISB", "LAHORE": "LHE", "MUMBAI": "BOM",
"DELHI": "DEL", "BANGALORE": "BLR", "CHENNAI": "MAA", "COLOMBO": "CMB",
"DHAKA": "DAC", "KATHMANDU": "KTM",
# Europe
"LONDON": "LHR", "PARIS": "CDG", "FRANKFURT": "FRA", "AMSTERDAM": "AMS",
"ISTANBUL": "IST", "MOSCOW": "SVO", "ROME": "FCO", "MADRID": "MAD",
"ZURICH": "ZRH", "VIENNA": "VIE", "BRUSSELS": "BRU", "WARSAW": "WAW",
"ATHENS": "ATH", "BUDAPEST": "BUD", "BUCHAREST": "OTP",
# North Africa
"CAIRO": "CAI", "CASABLANCA": "CMN", "TUNIS": "TUN", "ALGIERS": "ALG",
"TRIPOLI": "TIP", "KHARTOUM": "KRT", "ADDIS ABABA": "ADD",
# Asia Pacific
"SINGAPORE": "SIN", "BANGKOK": "BKK", "HONG KONG": "HKG", "TOKYO": "NRT",
"BEIJING": "PEK", "SHANGHAI": "PVG", "SEOUL": "ICN", "SYDNEY": "SYD",
"MELBOURNE": "MEL", "KUALA LUMPUR": "KUL", "JAKARTA": "CGK",
# Americas
"NEW YORK": "JFK", "LOS ANGELES": "LAX", "WASHINGTON": "IAD",
"CHICAGO": "ORD", "MIAMI": "MIA", "TORONTO": "YYZ", "SAO PAULO": "GRU",
# Other
"NAIROBI": "NBO", "JOHANNESBURG": "JNB", "ACCRA": "ACC",
"OMDB": "DXB", # ICAO codes sometimes appear in origin field
"OTHH": "DOH",
"OPKC": "KHI",
"OJAI": "AMM",
"LLBG": "TLV",
"OKBK": "KWI",
"OBBI": "BAH",
"OLBA": "BEY",
"OJAM": "AMM",
"OIII": "IKA",
"ORBI": "BGW",
"ORMM": "BSR",
"OSDI": "DAM",
"EGLL": "LHR",
"LFPG": "CDG",
"EDDF": "FRA",
"EHAM": "AMS",
"LTFM": "IST",
"UUEE": "SVO",
}
def normalise_iata(value: str) -> str:
"""Convert city name or ICAO code to IATA code; pass through if already IATA."""
v = str(value).strip().upper()
return CITY_TO_IATA.get(v, v)
# ── Conflict-location → IATA airport lookup ───────────────────────────────────
# Maps keywords found in conflict_events.csv 'location' column to nearby
# major airports that would be affected by the conflict.
_CONFLICT_LOCATION_TO_IATA: dict = {
# Middle East
"IRAN": ["THR", "IKA", "MHD"],
"TEHRAN": ["THR", "IKA"],
"IRAQ": ["BGW", "BSR", "NJF"],
"BAGHDAD": ["BGW"],
"BASRA": ["BSR"],
"SYRIA": ["DAM"],
"DAMASCUS": ["DAM"],
"LEBANON": ["BEY"],
"BEIRUT": ["BEY"],
"YEMEN": ["SAH", "ADE"],
"ISRAEL": ["TLV"],
"TEL AVIV": ["TLV"],
"GAZA": ["TLV"],
"JORDAN": ["AMM"],
"AMMAN": ["AMM"],
"PERSIAN GULF": ["DXB", "DOH", "AUH", "BAH", "KWI", "MCT"],
"GULF": ["DXB", "DOH", "AUH", "BAH", "KWI"],
"STRAIT OF HORMUZ": ["THR", "DXB", "DOH", "MCT"],
"RED SEA": ["SAH", "JED", "CAI"],
"SAUDI ARABIA": ["RUH", "JED", "DMM"],
"RIYADH": ["RUH"],
"JEDDAH": ["JED"],
"KUWAIT": ["KWI"],
"BAHRAIN": ["BAH"],
"OMAN": ["MCT"],
"UAE": ["DXB", "AUH", "SHJ"],
"DUBAI": ["DXB"],
"DOHA": ["DOH"],
"QATAR": ["DOH"],
# Eastern Europe
"UKRAINE": ["KBP", "HRK", "ODS"],
"KYIV": ["KBP"],
"KHARKIV": ["HRK"],
"ODESSA": ["ODS"],
"RUSSIA": ["SVO", "LED", "SVX"],
"MOSCOW": ["SVO"],
"BLACK SEA": ["KBP", "ODS"],
"CRIMEA": ["KBP", "ODS"],
# South Asia
"PAKISTAN": ["KHI", "ISB", "LHE"],
"KARACHI": ["KHI"],
"ISLAMABAD": ["ISB"],
"LAHORE": ["LHE"],
"AFGHANISTAN": ["KBL"],
"KABUL": ["KBL"],
"KASHMIR": ["SXR", "ISB"],
"INDIA": ["DEL", "BOM", "MAA", "CCU"],
# North Africa
"LIBYA": ["TIP", "BEN"],
"TRIPOLI": ["TIP"],
"SUDAN": ["KRT"],
"ETHIOPIA": ["ADD"],
"SOMALIA": ["MGQ"],
}
def get_conflict_zone_airports(lookback_days: int = 90,
min_severity: str = "Medium") -> frozenset:
"""
Build a dynamic set of conflict-zone IATA codes from recent high-severity
conflict events in data/base/conflict_events.csv.
Parameters
----------
lookback_days : int
Only consider events within this many days of today (default 90).
min_severity : str
Minimum event severity to include ("Low", "Medium", "High", "Critical").
Returns
-------
frozenset of IATA airport codes that are currently in active conflict zones.
Falls back to a hardcoded baseline set if the CSV is empty or missing.
"""
_SEVERITY_ORDER = {"Low": 1, "Minimal": 0, "Medium": 2, "Moderate": 2,
"High": 3, "Severe": 4, "Critical": 4}
min_sev_val = _SEVERITY_ORDER.get(min_severity, 2)
# Always include this baseline set (conflict zones we know about a priori)
_BASELINE = frozenset([
"TLV", "AMM", "BEY", "BGW", "DAM", "THR", "IKA", # Middle East
"KBP", "HRK", "ODS", # Ukraine
"KHI", "ISB", "LHE", "KBL", # South Asia
"SAH", "TIP", "KRT", # Africa conflict
])
conflict_df = load_csv_safe(BASE_DIR / "conflict_events.csv")
if conflict_df.empty:
logger.info("conflict_events.csv empty — using baseline conflict airports")
return _BASELINE
conflict_df["date"] = pd.to_datetime(conflict_df["date"], errors="coerce")
cutoff = pd.Timestamp.now() - pd.Timedelta(days=lookback_days)
recent = conflict_df[conflict_df["date"] >= cutoff].copy()
if recent.empty:
logger.info("No recent conflict events — using baseline conflict airports")
return _BASELINE
# Filter by severity
recent["_sev_val"] = recent["severity"].map(
lambda s: _SEVERITY_ORDER.get(str(s).strip().title(), 0))
recent = recent[recent["_sev_val"] >= min_sev_val]
if recent.empty:
return _BASELINE
# Extract IATA codes from location strings
dynamic_airports: set = set()
for loc in recent["location"].dropna().str.upper():
for keyword, iatas in _CONFLICT_LOCATION_TO_IATA.items():
if keyword in loc:
dynamic_airports.update(iatas)
result = _BASELINE | frozenset(dynamic_airports)
logger.info(
"Dynamic conflict-zone airports (%d recent events): %d airports — %s",
len(recent), len(result), sorted(result),
)
return result
# ── Loaders for each base CSV ─────────────────────────────────────────────────
def load_flight_cancellations() -> pd.DataFrame:
"""
Load and normalise flight_cancellations.csv.
Adds: iata_code (from origin/destination), cancellation_flag=1.
"""
df = load_csv_safe(BASE_DIR / "flight_cancellations.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["cancellation_flag"] = 1
# Use origin as the airport reference — normalise city names to IATA codes
df["iata_code"] = df["origin"].apply(normalise_iata)
df["country"] = df.get("origin_country", "Unknown")
return df
def load_airport_disruptions() -> pd.DataFrame:
"""
Load and normalise airport_disruptions.csv.
Encodes severity_level → numeric disruption_severity (0-4).
"""
df = load_csv_safe(BASE_DIR / "airport_disruptions.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["disruption_severity"] = df["severity_level"].map(SEVERITY_MAP).fillna(2)
df["iata_code"] = df["iata_code"].str.strip().str.upper()
return df
def load_airspace_closures() -> pd.DataFrame:
"""
Load and normalise airspace_closures.csv.
Derives airspace_risk_score from duration + flights_affected.
Adds: date column from closure_start_date.
"""
df = load_csv_safe(BASE_DIR / "airspace_closures.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["closure_start_date"], errors="coerce")
df["closure_end"] = pd.to_datetime(df["closure_end_date"], errors="coerce")
# Airspace risk score 0-4: based on duration and flights affected
max_dur = df["duration_hours"].max() if "duration_hours" in df.columns else 168
max_flt = df["flights_affected"].max() if "flights_affected" in df.columns else 500
df["duration_hours"] = pd.to_numeric(df.get("duration_hours", 0), errors="coerce").fillna(0)
df["flights_affected"] = pd.to_numeric(df.get("flights_affected", 0), errors="coerce").fillna(0)
df["airspace_risk_score"] = (
(df["duration_hours"] / (max_dur + 1)) * 2 +
(df["flights_affected"] / (max_flt + 1)) * 2
).clip(0, 4).round(2)
return df
def load_conflict_events() -> pd.DataFrame:
"""
Load and normalise conflict_events.csv.
Encodes severity + aviation_impact → numeric conflict_intensity.
"""
df = load_csv_safe(BASE_DIR / "conflict_events.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["severity_num"] = df["severity"].map(SEVERITY_MAP).fillna(2)
df["aviation_impact_num"] = df["aviation_impact"].apply(
lambda x: next((v for k, v in AVIATION_IMPACT_MAP.items()
if str(k).lower() in str(x).lower()), 1)
)
df["conflict_intensity"] = (
(df["severity_num"] / 4) * 0.6 + (df["aviation_impact_num"] / 4) * 0.4
).round(4)
# Extract region from location
df["region"] = df["location"].apply(_infer_region)
return df
def load_flight_reroutes() -> pd.DataFrame:
"""
Load and normalise flight_reroutes.csv.
Derives delay_hours, extra_fuel_cost per route/date.
"""
df = load_csv_safe(BASE_DIR / "flight_reroutes.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["extra_fuel_cost_usd"] = pd.to_numeric(df.get("extra_fuel_cost_usd", 0), errors="coerce").fillna(0)
df["delay_hours"] = pd.to_numeric(df.get("delay_hours", 0), errors="coerce").fillna(0)
df["iata_code"] = df["origin"].apply(normalise_iata)
return df
def load_airline_losses() -> pd.DataFrame:
"""Load airline_losses.csv for airline exposure scoring."""
df = load_csv_safe(BASE_DIR / "airline_losses.csv")
if df.empty:
return df
df["estimated_loss_usd"] = pd.to_numeric(df.get("estimated_loss_usd", 0), errors="coerce").fillna(0)
max_loss = df["estimated_loss_usd"].max()
df["airline_exposure_score"] = (df["estimated_loss_usd"] / (max_loss + 1) * 100).round(2)
return df
def load_oil_prices() -> pd.DataFrame:
"""Load oil_prices.csv from data/base/ (growing daily via yfinance)."""
df = load_csv_safe(BASE_DIR / "oil_prices.csv")
if df.empty:
logger.warning("data/base/oil_prices.csv not found — using fallback from data/derived/")
df = load_csv_safe(DERIVED_DIR / "oil_prices.csv")
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.sort_values("date")
# support both column naming conventions
price_col = next((c for c in ["brent_usd", "brent_price_usd", "oil_price"] if c in df.columns), None)
if price_col is None:
df["oil_price_change_pct"] = 0.0
elif "oil_price_change_pct" in df.columns:
df["oil_price_change_pct"] = df["oil_price_change_pct"].fillna(0)
else:
df["oil_price_change_pct"] = df[price_col].pct_change().fillna(0) * 100
# normalise to canonical "oil_price" column
if "oil_price" not in df.columns and price_col:
df["oil_price"] = df[price_col]
return df
def load_sentiment() -> pd.DataFrame:
"""Load sentiment.csv from data/base/ (growing via GDELT)."""
df = load_csv_safe(BASE_DIR / "sentiment.csv")
if df.empty:
logger.warning("data/base/sentiment.csv not found — using fallback from data/derived/")
df = load_csv_safe(DERIVED_DIR / "sentiment.csv")
if not df.empty:
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
return df
def load_flight_prices() -> pd.DataFrame:
"""Load flight_prices.csv from data/base/ (growing via SerpApi)."""
df = load_csv_safe(BASE_DIR / "flight_prices.csv")
if not df.empty:
df["timestamp"] = pd.to_datetime(df.get("timestamp", df.get("date")), errors="coerce")
return df
# ── Region inference helper ───────────────────────────────────────────────────
_REGION_KEYWORDS = {
"Middle East": ["iran", "iraq", "israel", "gaza", "yemen", "syria",
"uae", "dubai", "tehran", "beirut", "jordan", "saudi",
"bahrain", "qatar", "kuwait", "oman"],
"Eastern Europe": ["ukraine", "russia", "kyiv", "moscow", "poland",
"romania", "moldova", "belarus", "donbas"],
"South Asia": ["pakistan", "india", "afghanistan", "karachi", "delhi",
"kabul", "lahore"],
"Central Asia": ["kazakhstan", "uzbekistan", "tajikistan", "turkmenistan"],
"Africa": ["ethiopia", "sudan", "somalia", "libya", "mali", "niger",
"nigeria", "eritrea"],
"Western Europe": ["france", "germany", "uk", "london", "paris",
"brussels", "netherlands"],
"North America": ["usa", "united states", "canada", "mexico"],
"Asia-Pacific": ["china", "japan", "korea", "taiwan", "philippines",
"vietnam", "myanmar"],
}
def _infer_region(location: str) -> str:
loc = str(location).lower()
for region, keywords in _REGION_KEYWORDS.items():
if any(kw in loc for kw in keywords):
return region
return "Global"
# ── Aggregation builders ──────────────────────────────────────────────────────
def build_airport_daily_features() -> pd.DataFrame:
"""
Aggregate all base CSVs into a per-(airport, date) feature table.
This is the primary input for classification feature engineering.
Returns columns:
date, iata_code, region, country,
cancellation_count, cancellation_rate,
disruption_severity, flights_affected, disruption_index_raw,
airspace_risk_score (from closures),
avg_delay_hours, extra_fuel_cost_sum,
conflict_event_count, conflict_intensity_max,
oil_price, oil_price_change_pct
"""
# 1. Cancellations per airport per day
cancel_df = load_flight_cancellations()
if not cancel_df.empty:
cancel_agg = (
cancel_df.groupby(["date", "iata_code"])
.agg(
cancellation_count=("cancellation_flag", "sum"),
passengers_affected=("passengers_affected", "sum"),
country=("country", "first"),
)
.reset_index()
)
else:
cancel_agg = pd.DataFrame(columns=["date", "iata_code", "cancellation_count",
"passengers_affected", "country"])
# 2. Airport disruption severity per airport per day
disrupt_df = load_airport_disruptions()
if not disrupt_df.empty:
disrupt_agg = (
disrupt_df.groupby(["date", "iata_code"])
.agg(
disruption_severity=("disruption_severity", "max"),
flights_affected=("flights_affected", "sum"),
duration_hours=("duration_hours", "max"),
region=("region", "first"),
country=("country", "first"),
)
.reset_index()
)
else:
disrupt_agg = pd.DataFrame(columns=["date", "iata_code", "disruption_severity",
"flights_affected", "duration_hours",
"region", "country"])
# 3. Reroutes (delays) per airport per day
reroute_df = load_flight_reroutes()
if not reroute_df.empty:
reroute_agg = (
reroute_df.groupby(["date", "iata_code"])
.agg(
avg_delay_hours=("delay_hours", "mean"),
extra_fuel_cost_sum=("extra_fuel_cost_usd", "sum"),
reroute_count=("flight_number", "count"),
)
.reset_index()
)
else:
reroute_agg = pd.DataFrame(columns=["date", "iata_code", "avg_delay_hours",
"extra_fuel_cost_sum", "reroute_count"])
# 4. Airspace risk per country per day (broadcast to airports by country)
# FIX: closures span multiple days; expand each row across its full date range
# so that the date+country join actually hits airport_disruption records.
airspace_df = load_airspace_closures()
if not airspace_df.empty:
expanded_rows = []
for _, row in airspace_df.iterrows():
start = pd.to_datetime(row.get("date")).normalize() if pd.notna(row.get("date")) else None
end = pd.to_datetime(row.get("closure_end")).normalize() if pd.notna(row.get("closure_end")) else None
if start is None:
continue
if end is None or end < start:
dur = float(row.get("duration_hours", 24))
end = (start + pd.Timedelta(hours=dur)).normalize()
for d in pd.date_range(start, end, freq="D"):
expanded_rows.append({
"date": d,
"country": row.get("country", ""),
"airspace_risk_score": row.get("airspace_risk_score", 0),
})
if expanded_rows:
exp_df = pd.DataFrame(expanded_rows)
airspace_agg = (
exp_df.groupby(["date", "country"])
.agg(airspace_risk_score=("airspace_risk_score", "max"))
.reset_index()
)
else:
airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"])
else:
airspace_agg = pd.DataFrame(columns=["date", "country", "airspace_risk_score"])
# 5. Conflict events per region per day
conflict_df = load_conflict_events()
if not conflict_df.empty:
conflict_agg = (
conflict_df.groupby(["date", "region"])
.agg(
conflict_event_count=("event_type", "count"),
conflict_intensity_max=("conflict_intensity", "max"),
)
.reset_index()
)
else:
conflict_agg = pd.DataFrame(columns=["date", "region",
"conflict_event_count", "conflict_intensity_max"])
# 6. Merge all on (date, iata_code)
# Start from disruption (has most info), outer-join with cancellations
if not disrupt_agg.empty and not cancel_agg.empty:
merged = pd.merge(disrupt_agg, cancel_agg,
on=["date", "iata_code"], how="outer",
suffixes=("_d", "_c"))
# Coalesce country/region
merged["country"] = merged["country_d"].combine_first(merged["country_c"])
merged.drop(columns=["country_d", "country_c"], errors="ignore", inplace=True)
elif not disrupt_agg.empty:
merged = disrupt_agg.copy()
merged["cancellation_count"] = 0
elif not cancel_agg.empty:
merged = cancel_agg.copy()
merged["disruption_severity"] = 0
merged["flights_affected"] = 0
merged["duration_hours"] = 0
else:
logger.warning("No airport disruption or cancellation data available")
return pd.DataFrame()
# Add reroutes
if not reroute_agg.empty:
merged = pd.merge(merged, reroute_agg, on=["date", "iata_code"], how="left")
# 7. Infer region from country if missing
if "region" not in merged.columns or merged["region"].isna().any():
merged["region"] = merged.get("region", pd.Series("Global", index=merged.index))
country_region_map = (
disrupt_df[["country", "region"]].drop_duplicates().set_index("country")["region"].to_dict()
if not disrupt_df.empty else {}
)
merged["region"] = merged["region"].combine_first(
merged["country"].map(country_region_map)
).fillna("Global")
# 8. Join airspace risk by country + date
if not airspace_agg.empty:
merged["date"] = pd.to_datetime(merged["date"]).dt.normalize()
airspace_agg["date"] = pd.to_datetime(airspace_agg["date"]).dt.normalize()
merged = pd.merge(merged, airspace_agg, on=["date", "country"], how="left")
# 9. Join conflict events by region + date
if not conflict_agg.empty:
merged = pd.merge(merged, conflict_agg, on=["date", "region"], how="left")
# 10. Join oil prices by date
oil_df = load_oil_prices()
if not oil_df.empty:
oil_df["date"] = pd.to_datetime(oil_df["date"])
merged["date"] = pd.to_datetime(merged["date"])
# ensure canonical oil_price column exists (handles brent_usd or brent_price_usd)
oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None)
if oil_price_col and "oil_price" not in oil_df.columns:
oil_df["oil_price"] = oil_df[oil_price_col]
oil_cols = ["date", "oil_price", "oil_price_change_pct"]
oil_cols = [c for c in oil_cols if c in oil_df.columns]
merged = pd.merge_asof(
merged.dropna(subset=["date"]).sort_values("date"),
oil_df[oil_cols].dropna(subset=["date"]).sort_values("date"),
on="date",
direction="nearest",
tolerance=pd.Timedelta("7d"),
)
# 11. Fill numeric defaults
num_defaults = {
"cancellation_count": 0,
"passengers_affected": 0,
"disruption_severity": 0,
"flights_affected": 0,
"duration_hours": 0,
"avg_delay_hours": 0,
"extra_fuel_cost_sum": 0,
"reroute_count": 0,
"airspace_risk_score": 0,
"conflict_event_count": 0,
"conflict_intensity_max": 0,
"oil_price": 85.0,
"oil_price_change_pct": 0.0,
}
for col, val in num_defaults.items():
if col not in merged.columns:
merged[col] = val
else:
merged[col] = pd.to_numeric(merged[col], errors="coerce").fillna(val)
# 12. Compute disruption_index from base data
max_flights = merged["flights_affected"].max() or 1
merged["disruption_index"] = (
(merged["disruption_severity"] / 4) * 40 +
(merged["avg_delay_hours"] / 24).clip(0, 1) * 30 +
(merged["airspace_risk_score"] / 4) * 20 +
(merged["cancellation_count"] / 50).clip(0, 1) * 10
).clip(0, 100).round(2)
# 13. Compute cancellation_rate (per airport per day, as fraction of typical capacity)
# Proxy: cancellation_count / (cancellation_count + flights_affected + 1)
total = merged["cancellation_count"] + merged["flights_affected"] + 1
merged["cancellation_rate"] = (merged["cancellation_count"] / total).round(4)
# 14. Regional conflict weight
merged["region_weight"] = merged["region"].map(REGION_CONFLICT_WEIGHT).fillna(0.8)
# 15. Binary disruption target
# Threshold 30/100 reflects "high disruption" given available base data;
# with full feature stack (oil + airspace) the index will reach higher values
merged["is_high_disruption"] = (merged["disruption_index"] > 30).astype(int)
# 16. Add timestamp column (noon on each date)
merged["timestamp"] = (
pd.to_datetime(merged["date"]).apply(
lambda d: d.strftime("%Y-%m-%dT12:00:00") if pd.notna(d) else None
)
)
merged["date_str"] = pd.to_datetime(merged["date"]).dt.strftime("%Y-%m-%d")
logger.info("Airport daily features: %d rows (from real base data)", len(merged))
return merged
def build_sentiment_daily() -> pd.DataFrame:
"""
Aggregate sentiment.csv into daily regional averages.
Returns: date, region, sentiment_score, sentiment_momentum, article_count
"""
df = load_sentiment()
if df.empty:
return pd.DataFrame()
df["date"] = df["timestamp"].dt.date
agg_dict = {
"sentiment_score": ("sentiment_score", "mean"),
}
if "article_count" in df.columns:
agg_dict["article_count"] = ("article_count", "sum")
else:
agg_dict["article_count"] = ("sentiment_score", "count")
agg = df.groupby(["date", "region"]).agg(**agg_dict).reset_index()
agg["date"] = pd.to_datetime(agg["date"])
# Compute sentiment_momentum as rolling 12h (1-day) change per region
agg = agg.sort_values(["region", "date"])
agg["sentiment_momentum"] = agg.groupby("region")["sentiment_score"].diff().fillna(0)
return agg
def build_classification_input() -> pd.DataFrame:
"""
Build the full ML-ready classification DataFrame from data/base/ CSVs.
Merges airport daily features with sentiment and computes final feature set.
Returns a DataFrame with CLASSIFIER_FEATURES + CLASSIFIER_TARGET columns.
"""
from config.settings import CLASSIFIER_FEATURES, CLASSIFIER_TARGET
airport_df = build_airport_daily_features()
if airport_df.empty:
logger.error("No airport data available for classification input")
return pd.DataFrame()
# Merge sentiment by region + date
sentiment_df = build_sentiment_daily()
if not sentiment_df.empty:
airport_df["date_dt"] = pd.to_datetime(airport_df["date"])
sentiment_df["date_dt"] = pd.to_datetime(sentiment_df["date"])
airport_df = pd.merge_asof(
airport_df.sort_values("date_dt"),
sentiment_df[["date_dt", "region", "sentiment_score", "sentiment_momentum"]].sort_values("date_dt"),
on="date_dt",
by="region",
tolerance=pd.Timedelta("3d"),
direction="nearest",
suffixes=("", "_sent"),
)
for col in ["sentiment_score", "sentiment_momentum"]:
if f"{col}_sent" in airport_df.columns:
airport_df[col] = airport_df[col].combine_first(airport_df[f"{col}_sent"])
airport_df.drop(columns=[f"{col}_sent"], inplace=True, errors="ignore")
# Rename to match CLASSIFIER_FEATURES
rename_map = {
"airspace_risk_score": "airspace_risk_score",
"cancellation_rate": "cancellation_rate_24h",
"avg_delay_hours": "avg_delay_24h",
"oil_price_change_pct": "oil_price_change_pct",
"conflict_event_count": "conflict_event_count",
"disruption_index": "disruption_index_lag6h",
}
for old, new in rename_map.items():
if old in airport_df.columns and new not in airport_df.columns:
airport_df.rename(columns={old: new}, inplace=True)
# Fill all required features with defaults
for feat in CLASSIFIER_FEATURES:
if feat not in airport_df.columns:
airport_df[feat] = 0.0
airport_df[feat] = pd.to_numeric(airport_df[feat], errors="coerce").fillna(0)
# Compute airport_stress_score if not present
if "airport_stress_score" not in airport_df.columns:
airport_df["airport_stress_score"] = (
airport_df.get("disruption_index_lag6h", 0) * 0.5 +
airport_df.get("cancellation_rate_24h", 0) * 100 * 0.3 +
airport_df.get("airspace_risk_score", 0) / 4 * 100 * 0.2
).clip(0, 100).round(2)
result_cols = (
["timestamp", "date_str", "iata_code", "country", "region"] +
CLASSIFIER_FEATURES + [CLASSIFIER_TARGET]
)
result_cols = [c for c in result_cols if c in airport_df.columns]
result = airport_df[result_cols].dropna(subset=[CLASSIFIER_TARGET])
logger.info("Classification input: %d rows | positive rate: %.1f%% (from REAL base data)",
len(result), result[CLASSIFIER_TARGET].mean() * 100)
return result
def build_regression_input() -> pd.DataFrame:
"""
Build the ML-ready regression DataFrame for flight price prediction.
Source: data/base/flight_prices.csv (growing via SerpApi).
Falls back to data/derived/flight_prices.csv if base file is empty.
Returns a DataFrame with REGRESSOR_FEATURES + REGRESSOR_TARGET columns.
"""
from config.settings import REGRESSOR_FEATURES, REGRESSOR_TARGET
prices_df = load_flight_prices()
if prices_df.empty:
logger.warning("data/base/flight_prices.csv is empty — "
"falling back to derived prices (synthetic). "
"Run SerpApi ingestion to populate real price data.")
prices_df = load_csv_safe(DERIVED_DIR / "flight_prices.csv")
if prices_df.empty:
return pd.DataFrame()
# Merge oil prices by date
oil_df = load_oil_prices()
if not oil_df.empty:
prices_df["date_dt"] = pd.to_datetime(
prices_df.get("timestamp", prices_df.get("date")), errors="coerce")
oil_df["date"] = pd.to_datetime(oil_df["date"])
oil_price_col = next((c for c in ["oil_price", "brent_usd", "brent_price_usd"] if c in oil_df.columns), None)
if oil_price_col and "oil_price" not in oil_df.columns:
oil_df["oil_price"] = oil_df[oil_price_col]
oil_reg_cols = ["date", "oil_price", "oil_price_change_pct"]
oil_reg_cols = [c for c in oil_reg_cols if c in oil_df.columns]
prices_df = pd.merge_asof(
prices_df.sort_values("date_dt"),
oil_df[oil_reg_cols].sort_values("date"),
left_on="date_dt", right_on="date",
direction="nearest",
tolerance=pd.Timedelta("7d"),
)
if "oil_price" not in prices_df.columns:
prices_df["oil_price"] = 85.0
# Merge disruption index by route/date (airport-level)
airport_df = build_airport_daily_features()
if not airport_df.empty and "route" in prices_df.columns:
prices_df["origin"] = prices_df.get("origin", prices_df["route"].str.split("-").str[0])
prices_df["date_dt2"] = pd.to_datetime(prices_df.get("timestamp", prices_df.get("date")), errors="coerce")
airport_df["date_dt2"] = pd.to_datetime(airport_df["date"])
route_disrupt = (
airport_df.groupby("date_dt2")
["disruption_index"].mean().reset_index()
.rename(columns={"disruption_index": "disruption_index_route"})
)
prices_df = pd.merge_asof(
prices_df.sort_values("date_dt2"),
route_disrupt.sort_values("date_dt2"),
on="date_dt2",
direction="nearest",
tolerance=pd.Timedelta("7d"),
)
if "disruption_index" not in prices_df.columns:
prices_df["disruption_index"] = prices_df.get("disruption_index_route", 0)
# ── Dynamic route_conflict_flag (replaces static ingestion-time flag) ────────
# Recompute at pipeline run-time so the flag always reflects current
# conflict data rather than the hardcoded set baked in at ingestion.
try:
_conflict_airports = get_conflict_zone_airports(lookback_days=90, min_severity="Medium")
if _conflict_airports:
def _is_conflict_route(row) -> int:
orig = str(row.get("origin", "")).strip().upper()
dest = str(row.get("destination", "")).strip().upper()
return int(orig in _conflict_airports or dest in _conflict_airports)
prices_df["route_conflict_flag"] = prices_df.apply(_is_conflict_route, axis=1)
n_flagged = int(prices_df["route_conflict_flag"].sum())
logger.info(
"Dynamic route_conflict_flag: %d / %d routes flagged as conflict-zone",
n_flagged, len(prices_df),
)
except Exception as _rcf_err:
logger.warning("Dynamic route_conflict_flag failed (non-fatal): %s", _rcf_err)
# Fill required regression features
for feat in REGRESSOR_FEATURES:
if feat not in prices_df.columns:
prices_df[feat] = 0.0
prices_df[feat] = pd.to_numeric(prices_df[feat], errors="coerce").fillna(0)
# Ensure target
target_col = prices_df.get(REGRESSOR_TARGET)
if REGRESSOR_TARGET not in prices_df.columns:
logger.error("Regression target '%s' not found", REGRESSOR_TARGET)
return pd.DataFrame()
result_cols = (["timestamp", "route", "origin", "destination"] +
REGRESSOR_FEATURES + [REGRESSOR_TARGET])
result_cols = [c for c in result_cols if c in prices_df.columns]
result = prices_df[result_cols].dropna(subset=[REGRESSOR_TARGET])
logger.info("Regression input: %d rows | price range $%.0f–$%.0f",
len(result), result[REGRESSOR_TARGET].min(), result[REGRESSOR_TARGET].max())
return result
# ── Convenience summary loader for dashboard ──────────────────────────────────
def load_all_base_summary() -> dict:
"""
Return a dict of DataFrames for all base CSVs, for dashboard use.
Avoids re-loading in multiple dashboard tabs.
"""
return {
"flight_cancellations": load_flight_cancellations(),
"airport_disruptions": load_airport_disruptions(),
"airspace_closures": load_airspace_closures(),
"conflict_events": load_conflict_events(),
"flight_reroutes": load_flight_reroutes(),
"airline_losses": load_airline_losses(),
"oil_prices": load_oil_prices(),
"sentiment": load_sentiment(),
"flight_prices": load_flight_prices(),
}
if __name__ == "__main__":
print("=== Base Loader Self-Test ===\n")
print("Airport daily features:")
airport = build_airport_daily_features()
print(f" {len(airport)} rows")
if not airport.empty:
print(airport[["date", "iata_code", "region", "disruption_index",
"cancellation_count", "is_high_disruption"]].head(10).to_string(index=False))
print("\nClassification input:")
clf = build_classification_input()
print(f" {len(clf)} rows | positive rate: {clf['is_high_disruption'].mean():.1%}" if not clf.empty else " empty")
print("\nRegression input:")
reg = build_regression_input()
print(f" {len(reg)} rows" if not reg.empty else " empty (no flight_prices data yet)")