cyclone-pred-api / src /data_loader.py
clarindasusan's picture
Create data_loader.py
7042517 verified
"""
CycloneDataLoader
=================
Joins the four cyclone CSVs into a single training-ready DataFrame.
Handles spatial nearest-neighbour merging for SST, moisture, and shear.
"""
import pandas as pd
import numpy as np
from scipy.spatial import cKDTree
from pathlib import Path
def _nearest_merge(
base: pd.DataFrame,
aux: pd.DataFrame,
aux_cols: list[str],
aux_lat: str = "latitude",
aux_lon: str = "longitude",
) -> pd.DataFrame:
"""
For each row in `base`, find the spatially nearest row in `aux`
and attach aux_cols. Uses a cKDTree for efficiency.
"""
tree = cKDTree(aux[[aux_lat, aux_lon]].values)
dists, idxs = tree.query(base[["latitude", "longitude"]].values)
for col in aux_cols:
base[col] = aux[col].iloc[idxs].values
return base
def load_cyclone_training_data(data_dir: str = "data") -> pd.DataFrame:
"""
Returns a clean DataFrame with all 8 cyclone model features + a
synthetic `risk_score` label derived from intensity/landfall signals.
Columns produced (matching CYCLONE_FEATURES exactly):
wind_speed_kmh, central_pressure_hpa, sea_surface_temp_c,
track_curvature, distance_to_coast_km, storm_surge_potential,
atmospheric_moisture, shear_index
"""
p = Path(data_dir)
# ── Load raw tables ────────────────────────────────────────────────────
tracks = pd.read_csv(p / "cyclone_tracks_clean.csv", parse_dates=["date"])
sst = pd.read_csv(p / "sea_surface_temp.csv")
moist = pd.read_csv(p / "atmospheric_moisture.csv", parse_dates=["date"])
shear = pd.read_csv(p / "wind_shear.csv")
# ── Normalise column names to lowercase ───────────────────────────────
for df in (tracks, sst, moist, shear):
df.columns = df.columns.str.lower().str.strip()
# ── Base: use tracks as spine ──────────────────────────────────────────
df = tracks.copy()
# Rename to match model schema
df = df.rename(columns={
"wind_speed_kmh": "wind_speed_kmh", # already correct
"central_pressure_hpa": "central_pressure_hpa", # already correct
"track_curvature": "track_curvature",
"distance_to_coast_km": "distance_to_coast_km",
"storm_surge_potential": "storm_surge_potential",
})
# ── Merge SST (spatial nearest) ───────────────────────────────────────
# SST has time_index β€” match on spatial proximity only (SST changes slowly)
df = _nearest_merge(df, sst, aux_cols=["sea_surface_temp_c"])
# ── Merge atmospheric moisture (spatial nearest) ───────────────────────
df = _nearest_merge(
df, moist,
aux_cols=["atmospheric_moisture"],
)
# ── Merge shear (spatial nearest) ─────────────────────────────────────
df = _nearest_merge(df, shear, aux_cols=["shear_index"])
# ── Validate all features present ─────────────────────────────────────
required = [
"wind_speed_kmh", "central_pressure_hpa", "sea_surface_temp_c",
"track_curvature", "distance_to_coast_km", "storm_surge_potential",
"atmospheric_moisture", "shear_index",
]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"Missing columns after merge: {missing}")
# ── Synthesise risk label ──────────────────────────────────────────────
# Combines intensity + proximity + surge into a [0,1] score.
# Replace this with ground-truth labels if you have them.
df["risk_score"] = _compute_risk_label(df)
# ── Drop rows with NaN in any model feature ───────────────────────────
df = df.dropna(subset=required + ["risk_score"])
return df[required + ["risk_score", "date", "latitude", "longitude",
"basin", "storm_type"]].reset_index(drop=True)
def _compute_risk_label(df: pd.DataFrame) -> pd.Series:
"""
Synthetic risk score [0, 1] derived from physical intensity signals.
Logic:
- High wind + low pressure β†’ high base risk
- Close to coast β†’ landfall amplification
- High storm surge β†’ direct impact multiplier
- SST > 28Β°C β†’ intensification bonus
"""
# Normalise each driver to [0, 1]
wind_norm = np.clip(df["wind_speed_kmh"] / 350.0, 0, 1)
pressure_norm = np.clip(
(1013 - df["central_pressure_hpa"]) / (1013 - 870), 0, 1
)
coast_norm = np.clip(1 - df["distance_to_coast_km"] / 500.0, 0, 1)
surge_norm = np.clip(df["storm_surge_potential"], 0, 1)
sst_bonus = np.clip((df["sea_surface_temp_c"] - 26) / 9, 0, 1)
score = (
0.30 * wind_norm +
0.25 * pressure_norm +
0.20 * coast_norm +
0.15 * surge_norm +
0.10 * sst_bonus
)
# Add small noise to prevent the model from memorising exact thresholds
noise = np.random.normal(0, 0.02, len(score))
return np.clip(score + noise, 0.0, 1.0)