"""Zone-specific trigger threshold calibration. Computes (alert_wbgt_c, payout_peak_wbgt_c) per zone from each zone's own 20-year ERA5-Land WBGT distribution, with UHI delta applied. Replaces the global 35.1°C / 36.0°C thresholds used in Phase 1 with zone-relative thresholds calibrated to the same percentiles (P90 alert, P97 payout) — the actuarial pattern used by ARC, CCRIF, SEWA heat pilot. Activated by THRESHOLD_MODE env var: THRESHOLD_MODE=global (default) use WBGT_THRESHOLD_C / PAYOUT_PEAK THRESHOLD_MODE=zone_specific per-zone P90/P97 calibrated from local history Cached to data/zone_thresholds.json for reuse; safe to delete to force recompute after any UHI model change. """ from __future__ import annotations import json import logging import os from pathlib import Path from typing import Tuple import numpy as np from src.indexing.heat_index import calculate_wbgt log = logging.getLogger(__name__) _VALID_THRESHOLD_MODES = {"global", "zone_specific"} _THRESHOLD_MODE_DEFAULT = "global" _REPO_ROOT = Path(__file__).resolve().parents[2] ERA5_PATH = _REPO_ROOT / "data" / "era5land_dar_es_salaam.json" CACHE_PATH = _REPO_ROOT / "data" / "zone_thresholds.json" ALERT_PERCENTILE = 90 # P90 for alert-tier trigger (matches grid-cell # 35.1°C historical origin on raw ERA5-Land) PAYOUT_PERCENTILE = 97 # P97 for payout-tier peak severity def _threshold_mode() -> str: raw = os.environ.get("THRESHOLD_MODE", _THRESHOLD_MODE_DEFAULT).lower() if raw not in _VALID_THRESHOLD_MODES: log.warning( "THRESHOLD_MODE=%r is not one of %s — falling back to %r", raw, sorted(_VALID_THRESHOLD_MODES), _THRESHOLD_MODE_DEFAULT, ) return _THRESHOLD_MODE_DEFAULT return raw # In-memory cache so the pipeline per-zone loop doesn't re-read the JSON cache # file on every get_zone_thresholds(zone) call. Keyed by UHI_MODEL so a flag # change within the same process (e.g. tests) still picks up fresh values. _IN_MEMORY_CACHE: dict[str, dict[str, dict[str, float]]] = {} def compute_zone_thresholds(use_cache: bool = True) -> dict[str, dict[str, float]]: """Return {zone_id: {alert_c, payout_c, n_days, mean_wbgt_c}} for Dar zones. Iterates each Dar zone, applies its UHI delta (from the active UHI model) to the 20-year ERA5-Land DAR-JAN grid-cell series, computes per-day WBGT, and extracts percentiles. Cached in-process + on disk (``CACHE_PATH``) to avoid recomputing. """ uhi_key = os.environ.get("UHI_MODEL", "synthetic").lower() if use_cache: cached = _IN_MEMORY_CACHE.get(uhi_key) if cached is not None: return cached if CACHE_PATH.exists(): cached = json.loads(CACHE_PATH.read_text()) _IN_MEMORY_CACHE[uhi_key] = cached return cached from config import ZONES from src.downscaling import get_uhi_corrector era5 = json.loads(ERA5_PATH.read_text()) grid_rows = era5["DAR-JAN"] # all 15 Dar zones resolve to this grid cell corrector = get_uhi_corrector() out: dict[str, dict[str, float]] = {} for z in ZONES: if z.city != "Dar es Salaam": continue wbgts = [] for r in grid_rows: t = r.get("temp_max_c") h = r.get("humidity_pct") if t is None or h is None: continue month = int(r["date"][5:7]) # Apply UHI correction at this zone for this month (mid-day) corrected_t, _, _ = corrector.correct_temperature(z, float(t), hour=14, month=month) wbgts.append(calculate_wbgt(corrected_t, float(h))) if not wbgts: continue arr = np.asarray(wbgts, dtype=np.float32) out[z.zone_id] = { "alert_c": round(float(np.percentile(arr, ALERT_PERCENTILE)), 2), "payout_c": round(float(np.percentile(arr, PAYOUT_PERCENTILE)), 2), "n_days": int(arr.size), "mean_wbgt_c": round(float(arr.mean()), 2), "uhi_model": os.environ.get("UHI_MODEL", "synthetic").lower(), } CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) CACHE_PATH.write_text(json.dumps(out, indent=2)) _IN_MEMORY_CACHE[uhi_key] = out return out # Global-mode fallback constants (imported by callers for back-compat) GLOBAL_ALERT_C = 35.1 GLOBAL_PAYOUT_PEAK_C = 36.0 def get_zone_thresholds(zone) -> Tuple[float, float]: """Return (alert_c, payout_peak_c) for a zone. THRESHOLD_MODE=zone_specific: zone's own P90/P97 from historical series. THRESHOLD_MODE=global (default): (35.1, 36.0) regardless of zone. Zones without pre-computed thresholds (non-Dar zones, or first-ever run before cache exists) fall back to global. """ if _threshold_mode() != "zone_specific": return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C try: thresholds = compute_zone_thresholds(use_cache=True) except Exception: return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C zt = thresholds.get(zone.zone_id) if zt is None: return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C return float(zt["alert_c"]), float(zt["payout_c"]) def current_mode() -> str: return _threshold_mode() def persist_to_neon(conn, thresholds: dict[str, dict[str, float]] | None = None) -> int: """UPSERT zone_thresholds rows into Neon so the Vercel frontend can read per-zone values via the ``zones`` serverless endpoint. Called from the pipeline after ``compute_zone_thresholds``. Skipped silently when THRESHOLD_MODE != zone_specific — under global mode, the frontend's fallback (35.1 / 36.0) is the correct display value. """ if _threshold_mode() != "zone_specific": return 0 if thresholds is None: thresholds = compute_zone_thresholds(use_cache=True) if not thresholds: return 0 uhi_model = os.environ.get("UHI_MODEL", "synthetic").lower() mode = _threshold_mode() rows = [ (zid, float(v["alert_c"]), float(v["payout_c"]), uhi_model, mode) for zid, v in thresholds.items() ] if not rows: return 0 cur = conn.cursor() cur.executemany( """ INSERT INTO zone_thresholds (zone_id, alert_threshold_c, payout_threshold_c, uhi_model, threshold_mode, computed_at) VALUES (%s, %s, %s, %s, %s, NOW()) ON CONFLICT (zone_id) DO UPDATE SET alert_threshold_c = EXCLUDED.alert_threshold_c, payout_threshold_c = EXCLUDED.payout_threshold_c, uhi_model = EXCLUDED.uhi_model, threshold_mode = EXCLUDED.threshold_mode, computed_at = NOW() """, rows, ) conn.commit() cur.close() return len(rows) if __name__ == "__main__": import sys sys.path.insert(0, str(_REPO_ROOT)) # Force recompute if CACHE_PATH.exists(): CACHE_PATH.unlink() import os as _os _os.environ.setdefault("UHI_MODEL", "lst") print(f"Computing zone thresholds under UHI_MODEL={_os.environ['UHI_MODEL']}...") t = compute_zone_thresholds(use_cache=False) print(f"\n{'zone':9s} {'alert':>7s} {'payout':>7s} {'mean':>7s} {'n':>6s}") for zid, v in sorted(t.items()): print(f"{zid:9s} {v['alert_c']:>7.2f} {v['payout_c']:>7.2f} " f"{v['mean_wbgt_c']:>7.2f} {v['n_days']:>6d}") print(f"\nGlobal reference (THRESHOLD_MODE=global): alert={GLOBAL_ALERT_C} payout={GLOBAL_PAYOUT_PEAK_C}") print(f"\nCached at: {CACHE_PATH}")