climate-risk-engine / src /pricing /zone_thresholds.py
jtlevine's picture
Validate UHI_MODEL + THRESHOLD_MODE env vars; warn + fallback on typos
740baf3
"""Zone-specific trigger threshold calibration.
Computes (alert_wbgt_c, payout_peak_wbgt_c) per zone from each zone's own
20-year ERA5-Land WBGT distribution, with UHI delta applied. Replaces the
global 35.1°C / 36.0°C thresholds used in Phase 1 with zone-relative
thresholds calibrated to the same percentiles (P90 alert, P97 payout) —
the actuarial pattern used by ARC, CCRIF, SEWA heat pilot.
Activated by THRESHOLD_MODE env var:
THRESHOLD_MODE=global (default) use WBGT_THRESHOLD_C / PAYOUT_PEAK
THRESHOLD_MODE=zone_specific per-zone P90/P97 calibrated from local history
Cached to data/zone_thresholds.json for reuse; safe to delete to force
recompute after any UHI model change.
"""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from typing import Tuple
import numpy as np
from src.indexing.heat_index import calculate_wbgt
log = logging.getLogger(__name__)
_VALID_THRESHOLD_MODES = {"global", "zone_specific"}
_THRESHOLD_MODE_DEFAULT = "global"
_REPO_ROOT = Path(__file__).resolve().parents[2]
ERA5_PATH = _REPO_ROOT / "data" / "era5land_dar_es_salaam.json"
CACHE_PATH = _REPO_ROOT / "data" / "zone_thresholds.json"
ALERT_PERCENTILE = 90 # P90 for alert-tier trigger (matches grid-cell
# 35.1°C historical origin on raw ERA5-Land)
PAYOUT_PERCENTILE = 97 # P97 for payout-tier peak severity
def _threshold_mode() -> str:
raw = os.environ.get("THRESHOLD_MODE", _THRESHOLD_MODE_DEFAULT).lower()
if raw not in _VALID_THRESHOLD_MODES:
log.warning(
"THRESHOLD_MODE=%r is not one of %s — falling back to %r",
raw, sorted(_VALID_THRESHOLD_MODES), _THRESHOLD_MODE_DEFAULT,
)
return _THRESHOLD_MODE_DEFAULT
return raw
# In-memory cache so the pipeline per-zone loop doesn't re-read the JSON cache
# file on every get_zone_thresholds(zone) call. Keyed by UHI_MODEL so a flag
# change within the same process (e.g. tests) still picks up fresh values.
_IN_MEMORY_CACHE: dict[str, dict[str, dict[str, float]]] = {}
def compute_zone_thresholds(use_cache: bool = True) -> dict[str, dict[str, float]]:
"""Return {zone_id: {alert_c, payout_c, n_days, mean_wbgt_c}} for Dar zones.
Iterates each Dar zone, applies its UHI delta (from the active UHI model)
to the 20-year ERA5-Land DAR-JAN grid-cell series, computes per-day WBGT,
and extracts percentiles.
Cached in-process + on disk (``CACHE_PATH``) to avoid recomputing.
"""
uhi_key = os.environ.get("UHI_MODEL", "synthetic").lower()
if use_cache:
cached = _IN_MEMORY_CACHE.get(uhi_key)
if cached is not None:
return cached
if CACHE_PATH.exists():
cached = json.loads(CACHE_PATH.read_text())
_IN_MEMORY_CACHE[uhi_key] = cached
return cached
from config import ZONES
from src.downscaling import get_uhi_corrector
era5 = json.loads(ERA5_PATH.read_text())
grid_rows = era5["DAR-JAN"] # all 15 Dar zones resolve to this grid cell
corrector = get_uhi_corrector()
out: dict[str, dict[str, float]] = {}
for z in ZONES:
if z.city != "Dar es Salaam":
continue
wbgts = []
for r in grid_rows:
t = r.get("temp_max_c")
h = r.get("humidity_pct")
if t is None or h is None:
continue
month = int(r["date"][5:7])
# Apply UHI correction at this zone for this month (mid-day)
corrected_t, _, _ = corrector.correct_temperature(z, float(t), hour=14, month=month)
wbgts.append(calculate_wbgt(corrected_t, float(h)))
if not wbgts:
continue
arr = np.asarray(wbgts, dtype=np.float32)
out[z.zone_id] = {
"alert_c": round(float(np.percentile(arr, ALERT_PERCENTILE)), 2),
"payout_c": round(float(np.percentile(arr, PAYOUT_PERCENTILE)), 2),
"n_days": int(arr.size),
"mean_wbgt_c": round(float(arr.mean()), 2),
"uhi_model": os.environ.get("UHI_MODEL", "synthetic").lower(),
}
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps(out, indent=2))
_IN_MEMORY_CACHE[uhi_key] = out
return out
# Global-mode fallback constants (imported by callers for back-compat)
GLOBAL_ALERT_C = 35.1
GLOBAL_PAYOUT_PEAK_C = 36.0
def get_zone_thresholds(zone) -> Tuple[float, float]:
"""Return (alert_c, payout_peak_c) for a zone.
THRESHOLD_MODE=zone_specific: zone's own P90/P97 from historical series.
THRESHOLD_MODE=global (default): (35.1, 36.0) regardless of zone.
Zones without pre-computed thresholds (non-Dar zones, or first-ever run
before cache exists) fall back to global.
"""
if _threshold_mode() != "zone_specific":
return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C
try:
thresholds = compute_zone_thresholds(use_cache=True)
except Exception:
return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C
zt = thresholds.get(zone.zone_id)
if zt is None:
return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C
return float(zt["alert_c"]), float(zt["payout_c"])
def current_mode() -> str:
return _threshold_mode()
def persist_to_neon(conn, thresholds: dict[str, dict[str, float]] | None = None) -> int:
"""UPSERT zone_thresholds rows into Neon so the Vercel frontend can read
per-zone values via the ``zones`` serverless endpoint.
Called from the pipeline after ``compute_zone_thresholds``. Skipped silently
when THRESHOLD_MODE != zone_specific — under global mode, the frontend's
fallback (35.1 / 36.0) is the correct display value.
"""
if _threshold_mode() != "zone_specific":
return 0
if thresholds is None:
thresholds = compute_zone_thresholds(use_cache=True)
if not thresholds:
return 0
uhi_model = os.environ.get("UHI_MODEL", "synthetic").lower()
mode = _threshold_mode()
rows = [
(zid, float(v["alert_c"]), float(v["payout_c"]), uhi_model, mode)
for zid, v in thresholds.items()
]
if not rows:
return 0
cur = conn.cursor()
cur.executemany(
"""
INSERT INTO zone_thresholds
(zone_id, alert_threshold_c, payout_threshold_c, uhi_model, threshold_mode, computed_at)
VALUES (%s, %s, %s, %s, %s, NOW())
ON CONFLICT (zone_id) DO UPDATE SET
alert_threshold_c = EXCLUDED.alert_threshold_c,
payout_threshold_c = EXCLUDED.payout_threshold_c,
uhi_model = EXCLUDED.uhi_model,
threshold_mode = EXCLUDED.threshold_mode,
computed_at = NOW()
""",
rows,
)
conn.commit()
cur.close()
return len(rows)
if __name__ == "__main__":
import sys
sys.path.insert(0, str(_REPO_ROOT))
# Force recompute
if CACHE_PATH.exists():
CACHE_PATH.unlink()
import os as _os
_os.environ.setdefault("UHI_MODEL", "lst")
print(f"Computing zone thresholds under UHI_MODEL={_os.environ['UHI_MODEL']}...")
t = compute_zone_thresholds(use_cache=False)
print(f"\n{'zone':9s} {'alert':>7s} {'payout':>7s} {'mean':>7s} {'n':>6s}")
for zid, v in sorted(t.items()):
print(f"{zid:9s} {v['alert_c']:>7.2f} {v['payout_c']:>7.2f} "
f"{v['mean_wbgt_c']:>7.2f} {v['n_days']:>6d}")
print(f"\nGlobal reference (THRESHOLD_MODE=global): alert={GLOBAL_ALERT_C} payout={GLOBAL_PAYOUT_PEAK_C}")
print(f"\nCached at: {CACHE_PATH}")