climate-risk-engine / scripts /fetch_era5land.py
jtlevine's picture
Make fork-and-adapt flow truly region-agnostic
8c109d1
"""
Fetch ERA5-Land daily data for the primary-city zones via the CDS API.
ERA5-Land: 9km resolution (0.1°) — 6x finer than NASA POWER (0.5°/55km).
At 9km, neighborhood-scale zones typically resolve to 2-3 distinct grid cells.
Fetches one month at a time to stay within CDS cost limits.
Caches monthly NetCDF files in data/era5land_cache/.
Usage:
python3 scripts/fetch_era5land.py # uses config.PRIMARY_CITY
python3 scripts/fetch_era5land.py --city "Kampala" # different city
Output: data/era5land_{city_slug}.json
"""
import argparse
import json
import math
import sys
import time
from pathlib import Path
import numpy as np
# Make project root importable so config.py resolves
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from config import ZONES as CONFIG_ZONES, PRIMARY_CITY, CITIES, slug_for
def bbox_for_zones(zones, margin: float = 0.05):
"""Compute (N, S, W, E) bounding box from zone lat/lon with a small margin.
Default margin ≈0.5 ERA5-Land grid cells (9km), ensuring every zone has
grid neighbors for nearest-neighbor interpolation.
"""
lats = [z.latitude for z in zones]
lons = [z.longitude for z in zones]
north = max(lats) + margin
south = min(lats) - margin
west = min(lons) - margin
east = max(lons) + margin
return north, south, west, east
def fetch_month(client, year, month, area, cache_prefix, cache_dir):
"""Fetch one month of ERA5-Land data. Returns path or None."""
path = cache_dir / f"{cache_prefix}_{year}_{month:02d}.nc"
if path.exists() and path.stat().st_size > 1000:
return path
print(f" {year}-{month:02d}: requesting...", end="", flush=True)
try:
client.retrieve(
"reanalysis-era5-land",
{
"variable": [
"2m_temperature",
"2m_dewpoint_temperature",
"10m_u_component_of_wind",
"10m_v_component_of_wind",
"surface_solar_radiation_downwards",
"total_precipitation",
],
"year": str(year),
"month": f"{month:02d}",
"day": [f"{d:02d}" for d in range(1, 32)],
"time": ["06:00", "12:00", "18:00"],
"area": list(area), # [N, W, S, E]
"data_format": "netcdf",
},
str(path),
)
size_kb = path.stat().st_size / 1024
print(f" ok ({size_kb:.0f} KB)")
return path
except Exception as e:
print(f" FAILED: {e}")
if path.exists():
path.unlink()
return None
def extract_all_zones(nc_path, zones):
"""Extract daily records for all zones from a single NetCDF file.
Opens the file once and reads every variable in full; per-zone records
are then computed from in-memory slices. For a 15-zone month this is
15x cheaper than opening once per zone.
"""
import netCDF4 as nc
ds = nc.Dataset(str(nc_path), "r")
lats = ds.variables["latitude"][:]
lons = ds.variables["longitude"][:]
time_var = "valid_time" if "valid_time" in ds.variables else "time"
times = nc.num2date(
ds.variables[time_var][:],
ds.variables[time_var].units,
ds.variables[time_var].calendar if hasattr(ds.variables[time_var], 'calendar') else 'standard',
)
t2m_all = ds.variables["t2m"][:]
d2m_all = ds.variables["d2m"][:]
u10_all = ds.variables["u10"][:]
v10_all = ds.variables["v10"][:]
ssrd_all = ds.variables["ssrd"][:]
tp_all = ds.variables["tp"][:]
ds.close()
result = {}
for zone in zones:
lat_idx = np.argmin(np.abs(lats - zone.latitude))
lon_idx = np.argmin(np.abs(lons - zone.longitude))
result[zone.zone_id] = _daily_records(
times,
t2m_all[:, lat_idx, lon_idx],
d2m_all[:, lat_idx, lon_idx],
u10_all[:, lat_idx, lon_idx],
v10_all[:, lat_idx, lon_idx],
ssrd_all[:, lat_idx, lon_idx],
tp_all[:, lat_idx, lon_idx],
)
return result
def _daily_records(times, t2m, d2m, u10, v10, ssrd, tp):
"""Aggregate hourly samples into per-day summary records for one grid point."""
daily = {}
for i, t in enumerate(times):
day_str = t.strftime("%Y-%m-%d") if hasattr(t, "strftime") else str(t)[:10]
if day_str not in daily:
daily[day_str] = {"t2m": [], "d2m": [], "u10": [], "v10": [], "ssrd": [], "tp": []}
daily[day_str]["t2m"].append(float(t2m[i]))
daily[day_str]["d2m"].append(float(d2m[i]))
daily[day_str]["u10"].append(float(u10[i]))
daily[day_str]["v10"].append(float(v10[i]))
daily[day_str]["ssrd"].append(float(ssrd[i]))
daily[day_str]["tp"].append(float(tp[i]))
records = []
for day_str in sorted(daily.keys()):
d = daily[day_str]
if not d["t2m"]:
continue
temps_c = [t - 273.15 for t in d["t2m"]]
dewpoints_c = [t - 273.15 for t in d["d2m"]]
temp_mean = sum(temps_c) / len(temps_c)
mean_dew = sum(dewpoints_c) / len(dewpoints_c)
es = 6.112 * math.exp(17.67 * temp_mean / (temp_mean + 243.5))
ea = 6.112 * math.exp(17.67 * mean_dew / (mean_dew + 243.5))
humidity = min(100.0, max(0.0, (ea / es) * 100.0)) if es > 0 else 50.0
wind_speeds = [math.sqrt(u**2 + v**2) for u, v in zip(d["u10"], d["v10"])]
solar_vals = [max(0, s / 10800.0) for s in d["ssrd"]]
records.append({
"date": day_str,
"temp_max_c": round(max(temps_c), 2),
"temp_min_c": round(min(temps_c), 2),
"temp_mean_c": round(temp_mean, 2),
"humidity_pct": round(humidity, 1),
"wind_speed_ms": round(sum(wind_speeds) / len(wind_speeds), 2),
"solar_rad_wm2": round(sum(solar_vals) / len(solar_vals), 1),
"precip_mm": round(sum(max(0, p * 1000.0) for p in d["tp"]), 2),
})
return records
def main():
default_city = PRIMARY_CITY or (CITIES[0] if CITIES else "Dar es Salaam")
parser = argparse.ArgumentParser(
description="Fetch ERA5-Land data for the configured primary city's zones."
)
parser.add_argument(
"--city",
default=default_city,
help=f"City to fetch (default: {default_city} from config.PRIMARY_CITY)",
)
parser.add_argument("--start-year", type=int, default=2005, help="First year to fetch")
parser.add_argument("--end-year", type=int, default=2024, help="Last year to fetch")
args = parser.parse_args()
city = args.city
slug = slug_for(city)
# Filter zones for the target city
city_zones = [z for z in CONFIG_ZONES if z.city == city]
if not city_zones:
print(f"ERROR: No zones found for city '{city}'")
print(f" Available cities: {sorted(set(z.city for z in CONFIG_ZONES))}")
sys.exit(1)
data_dir = Path(__file__).resolve().parents[1] / "data"
cache_dir = data_dir / "era5land_cache"
output_file = data_dir / f"era5land_{slug}.json"
cache_prefix = f"era5land_{slug}"
data_dir.mkdir(parents=True, exist_ok=True)
cache_dir.mkdir(exist_ok=True)
north, south, west, east = bbox_for_zones(city_zones)
# CDS API "area" is [N, W, S, E]
area = (north, west, south, east)
import cdsapi
client = cdsapi.Client()
total_months = (args.end_year - args.start_year + 1) * 12
cached = len(list(cache_dir.glob(f"{cache_prefix}_*.nc")))
print(f"ERA5-Land fetch for {city}")
print(f" Zones: {len(city_zones)} ({', '.join(z.name for z in city_zones)})")
print(f" Period: {args.start_year}-{args.end_year} ({total_months} months)")
print(f" Cached: {cached} months")
print(f" Bounding box: N={north:.2f}, S={south:.2f}, W={west:.2f}, E={east:.2f}")
print(f" Output: {output_file}")
print()
# Fetch all months
failed = []
for year in range(args.start_year, args.end_year + 1):
print(f"{year}:")
for month in range(1, 13):
result = fetch_month(client, year, month, area, cache_prefix, cache_dir)
if result is None:
failed.append(f"{year}-{month:02d}")
time.sleep(1) # rate limiting
if failed:
print(f"\nWARNING: {len(failed)} months failed: {failed[:10]}")
# Extract per-zone data
print(f"\nExtracting per-zone daily data...")
all_data = {z.zone_id: [] for z in city_zones}
for year in range(args.start_year, args.end_year + 1):
for month in range(1, 13):
nc_path = cache_dir / f"{cache_prefix}_{year}_{month:02d}.nc"
if not nc_path.exists() or nc_path.stat().st_size < 1000:
continue
per_zone = extract_all_zones(nc_path, city_zones)
for zid, records in per_zone.items():
all_data[zid].extend(records)
# Save
with open(output_file, "w") as f:
json.dump(all_data, f)
size_mb = output_file.stat().st_size / 1e6
print(f"\nSaved to {output_file} ({size_mb:.1f} MB)")
# Check if zones got different data (the whole point of ERA5-Land)
print(f"\n=== Spatial differentiation check ===")
zone_name = {z.zone_id: z.name for z in city_zones}
for zid, records in all_data.items():
if not records:
print(f" {zid}: NO DATA")
continue
temps = [r["temp_max_c"] for r in records]
humids = [r["humidity_pct"] for r in records]
print(
f" {zid} ({zone_name[zid]:12s}): {len(records)} days, "
f"temp_max mean={sum(temps)/len(temps):.2f}°C, "
f"humidity mean={sum(humids)/len(humids):.1f}%"
)
# Check if grid points differ
if len(all_data) >= 2:
z1, z2 = list(all_data.keys())[:2]
r1, r2 = all_data[z1], all_data[z2]
if r1 and r2 and len(r1) == len(r2):
diffs = [abs(a["temp_max_c"] - b["temp_max_c"]) for a, b in zip(r1, r2)]
mean_diff = sum(diffs) / len(diffs)
print(f"\n Mean temp difference {z1} vs {z2}: {mean_diff:.3f}°C")
if mean_diff > 0.01:
print(f" ERA5-Land is resolving different grid cells!")
else:
print(f" Same grid cell (zones too close for 9km resolution)")
if __name__ == "__main__":
main()