| """Zone-specific trigger threshold calibration. |
| |
| Computes (alert_wbgt_c, payout_peak_wbgt_c) per zone from each zone's own |
| 20-year ERA5-Land WBGT distribution, with UHI delta applied. Replaces the |
| global 35.1°C / 36.0°C thresholds used in Phase 1 with zone-relative |
| thresholds calibrated to the same percentiles (P90 alert, P97 payout) — |
| the actuarial pattern used by ARC, CCRIF, SEWA heat pilot. |
| |
| Activated by THRESHOLD_MODE env var: |
| THRESHOLD_MODE=global (default) use WBGT_THRESHOLD_C / PAYOUT_PEAK |
| THRESHOLD_MODE=zone_specific per-zone P90/P97 calibrated from local history |
| |
| Cached to data/zone_thresholds.json for reuse; safe to delete to force |
| recompute after any UHI model change. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Tuple |
|
|
| import numpy as np |
|
|
| from src.indexing.heat_index import calculate_wbgt |
|
|
| log = logging.getLogger(__name__) |
|
|
| _VALID_THRESHOLD_MODES = {"global", "zone_specific"} |
| _THRESHOLD_MODE_DEFAULT = "global" |
|
|
| _REPO_ROOT = Path(__file__).resolve().parents[2] |
| ERA5_PATH = _REPO_ROOT / "data" / "era5land_dar_es_salaam.json" |
| CACHE_PATH = _REPO_ROOT / "data" / "zone_thresholds.json" |
|
|
| ALERT_PERCENTILE = 90 |
| |
| PAYOUT_PERCENTILE = 97 |
|
|
|
|
| def _threshold_mode() -> str: |
| raw = os.environ.get("THRESHOLD_MODE", _THRESHOLD_MODE_DEFAULT).lower() |
| if raw not in _VALID_THRESHOLD_MODES: |
| log.warning( |
| "THRESHOLD_MODE=%r is not one of %s — falling back to %r", |
| raw, sorted(_VALID_THRESHOLD_MODES), _THRESHOLD_MODE_DEFAULT, |
| ) |
| return _THRESHOLD_MODE_DEFAULT |
| return raw |
|
|
|
|
| |
| |
| |
| _IN_MEMORY_CACHE: dict[str, dict[str, dict[str, float]]] = {} |
|
|
|
|
| def compute_zone_thresholds(use_cache: bool = True) -> dict[str, dict[str, float]]: |
| """Return {zone_id: {alert_c, payout_c, n_days, mean_wbgt_c}} for Dar zones. |
| |
| Iterates each Dar zone, applies its UHI delta (from the active UHI model) |
| to the 20-year ERA5-Land DAR-JAN grid-cell series, computes per-day WBGT, |
| and extracts percentiles. |
| |
| Cached in-process + on disk (``CACHE_PATH``) to avoid recomputing. |
| """ |
| uhi_key = os.environ.get("UHI_MODEL", "synthetic").lower() |
| if use_cache: |
| cached = _IN_MEMORY_CACHE.get(uhi_key) |
| if cached is not None: |
| return cached |
| if CACHE_PATH.exists(): |
| cached = json.loads(CACHE_PATH.read_text()) |
| _IN_MEMORY_CACHE[uhi_key] = cached |
| return cached |
|
|
| from config import ZONES |
| from src.downscaling import get_uhi_corrector |
|
|
| era5 = json.loads(ERA5_PATH.read_text()) |
| grid_rows = era5["DAR-JAN"] |
| corrector = get_uhi_corrector() |
|
|
| out: dict[str, dict[str, float]] = {} |
| for z in ZONES: |
| if z.city != "Dar es Salaam": |
| continue |
| wbgts = [] |
| for r in grid_rows: |
| t = r.get("temp_max_c") |
| h = r.get("humidity_pct") |
| if t is None or h is None: |
| continue |
| month = int(r["date"][5:7]) |
| |
| corrected_t, _, _ = corrector.correct_temperature(z, float(t), hour=14, month=month) |
| wbgts.append(calculate_wbgt(corrected_t, float(h))) |
| if not wbgts: |
| continue |
| arr = np.asarray(wbgts, dtype=np.float32) |
| out[z.zone_id] = { |
| "alert_c": round(float(np.percentile(arr, ALERT_PERCENTILE)), 2), |
| "payout_c": round(float(np.percentile(arr, PAYOUT_PERCENTILE)), 2), |
| "n_days": int(arr.size), |
| "mean_wbgt_c": round(float(arr.mean()), 2), |
| "uhi_model": os.environ.get("UHI_MODEL", "synthetic").lower(), |
| } |
| CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) |
| CACHE_PATH.write_text(json.dumps(out, indent=2)) |
| _IN_MEMORY_CACHE[uhi_key] = out |
| return out |
|
|
|
|
| |
| GLOBAL_ALERT_C = 35.1 |
| GLOBAL_PAYOUT_PEAK_C = 36.0 |
|
|
|
|
| def get_zone_thresholds(zone) -> Tuple[float, float]: |
| """Return (alert_c, payout_peak_c) for a zone. |
| |
| THRESHOLD_MODE=zone_specific: zone's own P90/P97 from historical series. |
| THRESHOLD_MODE=global (default): (35.1, 36.0) regardless of zone. |
| |
| Zones without pre-computed thresholds (non-Dar zones, or first-ever run |
| before cache exists) fall back to global. |
| """ |
| if _threshold_mode() != "zone_specific": |
| return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C |
| try: |
| thresholds = compute_zone_thresholds(use_cache=True) |
| except Exception: |
| return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C |
| zt = thresholds.get(zone.zone_id) |
| if zt is None: |
| return GLOBAL_ALERT_C, GLOBAL_PAYOUT_PEAK_C |
| return float(zt["alert_c"]), float(zt["payout_c"]) |
|
|
|
|
| def current_mode() -> str: |
| return _threshold_mode() |
|
|
|
|
| def persist_to_neon(conn, thresholds: dict[str, dict[str, float]] | None = None) -> int: |
| """UPSERT zone_thresholds rows into Neon so the Vercel frontend can read |
| per-zone values via the ``zones`` serverless endpoint. |
| |
| Called from the pipeline after ``compute_zone_thresholds``. Skipped silently |
| when THRESHOLD_MODE != zone_specific — under global mode, the frontend's |
| fallback (35.1 / 36.0) is the correct display value. |
| """ |
| if _threshold_mode() != "zone_specific": |
| return 0 |
| if thresholds is None: |
| thresholds = compute_zone_thresholds(use_cache=True) |
| if not thresholds: |
| return 0 |
| uhi_model = os.environ.get("UHI_MODEL", "synthetic").lower() |
| mode = _threshold_mode() |
| rows = [ |
| (zid, float(v["alert_c"]), float(v["payout_c"]), uhi_model, mode) |
| for zid, v in thresholds.items() |
| ] |
| if not rows: |
| return 0 |
| cur = conn.cursor() |
| cur.executemany( |
| """ |
| INSERT INTO zone_thresholds |
| (zone_id, alert_threshold_c, payout_threshold_c, uhi_model, threshold_mode, computed_at) |
| VALUES (%s, %s, %s, %s, %s, NOW()) |
| ON CONFLICT (zone_id) DO UPDATE SET |
| alert_threshold_c = EXCLUDED.alert_threshold_c, |
| payout_threshold_c = EXCLUDED.payout_threshold_c, |
| uhi_model = EXCLUDED.uhi_model, |
| threshold_mode = EXCLUDED.threshold_mode, |
| computed_at = NOW() |
| """, |
| rows, |
| ) |
| conn.commit() |
| cur.close() |
| return len(rows) |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| sys.path.insert(0, str(_REPO_ROOT)) |
| |
| if CACHE_PATH.exists(): |
| CACHE_PATH.unlink() |
| import os as _os |
| _os.environ.setdefault("UHI_MODEL", "lst") |
| print(f"Computing zone thresholds under UHI_MODEL={_os.environ['UHI_MODEL']}...") |
| t = compute_zone_thresholds(use_cache=False) |
| print(f"\n{'zone':9s} {'alert':>7s} {'payout':>7s} {'mean':>7s} {'n':>6s}") |
| for zid, v in sorted(t.items()): |
| print(f"{zid:9s} {v['alert_c']:>7.2f} {v['payout_c']:>7.2f} " |
| f"{v['mean_wbgt_c']:>7.2f} {v['n_days']:>6d}") |
| print(f"\nGlobal reference (THRESHOLD_MODE=global): alert={GLOBAL_ALERT_C} payout={GLOBAL_PAYOUT_PEAK_C}") |
| print(f"\nCached at: {CACHE_PATH}") |
|
|