Spaces:
Sleeping
Sleeping
| """ | |
| High-level ISO-NE per-zone demand fetcher for the Space. | |
| Wraps the low-level fetcher in ``iso_ne_zonal.py`` with: | |
| - In-memory cache (5-minute TTL) so repeated clicks within a few | |
| minutes don't refetch from ISO-NE | |
| - Optional bundled CSV fallback for offline / API-down scenarios | |
| - Optional integration with a long-history CSV pulled from the data | |
| repo at Space startup (used to seed Chronos context without | |
| re-fetching 30 days of ISO-NE on every click) | |
| Public API kept stable so ``app.py`` can swap from the old EIA-based | |
| implementation without further changes: | |
| - ``ZONE_COLS`` : list of 8 zone names | |
| - ``fetch_recent_demand_mwh(end_dt)`` : (24, 8) MWh + source label | |
| - ``fetch_long_history_mwh(end_dt, hours=720)`` : (hours, 8) MWh + label | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from io import StringIO | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from iso_ne_zonal import ZONE_COLS, fetch_range, fetch_recent_hours | |
| logger = logging.getLogger(__name__) | |
| ASSETS_DIR = Path(__file__).parent / "assets" | |
| SAMPLE_CSV = ASSETS_DIR / "sample_demand_2022.csv" | |
| SAMPLE_CSV_LONG = ASSETS_DIR / "sample_demand_2022_long.csv" | |
| # In-memory cache: { ("recent", end_hour) | ("long", end_hour, hours) -> (ts, np.ndarray) } | |
| _CACHE: dict = {} | |
| _CACHE_TTL_SECONDS = 300 | |
| # Path of the data-repo 30-day CSV (refreshed daily by GitHub Actions in | |
| # new-england-real-time-power-predict-data; downloaded by app.py at | |
| # startup and saved to /tmp). When present, fetch_long_history_mwh | |
| # uses it as the base and splices in the last 1-2 days from live API. | |
| DATA_REPO_30D_CSV_PATH = Path(os.environ.get( | |
| "DATA_REPO_30D_CSV_PATH", "/tmp/iso_ne_30d.csv")) | |
| def _hour_floor_utc(dt: datetime) -> datetime: | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=timezone.utc) | |
| return dt.astimezone(timezone.utc).replace( | |
| minute=0, second=0, microsecond=0, tzinfo=None) | |
| def _cache_get(key: tuple) -> Optional[np.ndarray]: | |
| cached = _CACHE.get(key) | |
| if cached is None: | |
| return None | |
| ts, arr = cached | |
| if (datetime.now(timezone.utc) - ts).total_seconds() < _CACHE_TTL_SECONDS: | |
| return arr.copy() | |
| return None | |
| def _cache_put(key: tuple, arr: np.ndarray) -> None: | |
| _CACHE[(key)] = (datetime.now(timezone.utc), arr.copy()) | |
| def _load_sample_recent() -> np.ndarray: | |
| df = pd.read_csv(SAMPLE_CSV) | |
| arr = df[ZONE_COLS].tail(24).to_numpy(dtype=np.float32) | |
| if arr.shape != (24, 8): | |
| raise RuntimeError( | |
| f"Bundled sample_demand_2022.csv has wrong shape {arr.shape}") | |
| return arr | |
| def _load_sample_long(hours: int) -> np.ndarray: | |
| if SAMPLE_CSV_LONG.exists(): | |
| df = pd.read_csv(SAMPLE_CSV_LONG) | |
| arr = df[ZONE_COLS].tail(hours).to_numpy(dtype=np.float32) | |
| if arr.shape == (hours, 8): | |
| return arr | |
| short = _load_sample_recent() | |
| return np.tile(short, (hours // 24 + 1, 1))[:hours].astype(np.float32) | |
| def fetch_recent_demand_mwh(end_dt: Optional[datetime] = None | |
| ) -> tuple[np.ndarray, str]: | |
| """Return ``(24, 8)`` MWh for the most recent 24 contiguous hours | |
| ending at ``end_dt`` (or now). Source label is one of: | |
| - ``"live (ISO-NE 5-min zonal -> hourly)"`` | |
| - ``"cached"`` | |
| - ``"sample-2022"`` | |
| """ | |
| if end_dt is None: | |
| end_dt = datetime.now(timezone.utc) | |
| end_dt = _hour_floor_utc(end_dt) | |
| cache_key = ("recent", end_dt) | |
| cached = _cache_get(cache_key) | |
| if cached is not None: | |
| return cached, "cached" | |
| try: | |
| arr, latest = fetch_recent_hours(end_dt, hours=24) | |
| _cache_put(cache_key, arr) | |
| lag_hours = (end_dt - latest).total_seconds() / 3600 | |
| label = f"live (ISO-NE 5-min zonal, latest hour {latest.isoformat()}, " | |
| label += f"lag {lag_hours:.0f}h)" if lag_hours > 0 else f"live (ISO-NE 5-min zonal)" | |
| return arr, label | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning("ISO-NE realtime fetch failed: %s; falling back to bundled CSV", e) | |
| return _load_sample_recent(), "sample-2022 (ISO-NE unreachable)" | |
| def _load_30d_base() -> Optional[pd.DataFrame]: | |
| """Load data-repo's pre-built 30-day per-zone CSV if available.""" | |
| if not DATA_REPO_30D_CSV_PATH.exists(): | |
| return None | |
| try: | |
| df = pd.read_csv(DATA_REPO_30D_CSV_PATH, parse_dates=["timestamp_utc"]) | |
| df = df.set_index("timestamp_utc").sort_index() | |
| return df[ZONE_COLS] | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning("Failed to load 30d base CSV at %s: %s", | |
| DATA_REPO_30D_CSV_PATH, e) | |
| return None | |
| def fetch_long_history_mwh(end_dt: Optional[datetime] = None, | |
| hours: int = 720 | |
| ) -> tuple[np.ndarray, str]: | |
| """Return ``(hours, 8)`` MWh of per-zone history ending at ``end_dt - 1h``. | |
| Strategy: | |
| 1. If the data repo's 30d base CSV is present, start from it. | |
| 2. Otherwise fall back to the bundled long-history CSV. | |
| 3. Always splice the last ~24-48 hours from the live ISO-NE API | |
| so the tail is fresh. | |
| """ | |
| if end_dt is None: | |
| end_dt = datetime.now(timezone.utc) | |
| end_dt = _hour_floor_utc(end_dt) | |
| cache_key = ("long", end_dt, hours) | |
| cached = _cache_get(cache_key) | |
| if cached is not None: | |
| return cached, "cached" | |
| target_end = end_dt - timedelta(hours=1) # last hour we want | |
| target_start = target_end - timedelta(hours=hours - 1) | |
| base = _load_30d_base() | |
| base_label = "data-repo 30d" | |
| if base is None: | |
| long_arr = _load_sample_long(hours) | |
| out = long_arr | |
| _cache_put(cache_key, out) | |
| return out, "sample-2022 (no data-repo CSV)" | |
| # Try to splice live ISO-NE for the last 2 days for freshness | |
| splice_label = "" | |
| try: | |
| live = fetch_range(target_end - timedelta(days=2), target_end, | |
| hourly=True) | |
| # Overwrite overlapping rows in `base` with `live` | |
| base.update(live) | |
| splice_label = " + live splice" | |
| except Exception as e: # noqa: BLE001 | |
| logger.info("Live splice into long history failed: %s", e) | |
| # Ensure we have continuous coverage; if base doesn't reach target_start, | |
| # fall back to bundled long CSV for the missing tail | |
| if base.index.min() > target_start: | |
| logger.info("30d base starts at %s, missing %s -> %s; padding from sample", | |
| base.index.min(), target_start, base.index.min()) | |
| sample_long = _load_sample_long(hours) | |
| out = sample_long | |
| else: | |
| # Slice exact window | |
| idx = pd.date_range(start=target_start, end=target_end, freq="1h") | |
| sliced = base.reindex(idx) | |
| if sliced.isna().any().any(): | |
| logger.info("30d base has %d NaN rows in window; interpolating", | |
| int(sliced.isna().any(axis=1).sum())) | |
| sliced = sliced.interpolate(method="time", limit=12).ffill().bfill() | |
| out = sliced[ZONE_COLS].to_numpy(dtype=np.float32) | |
| _cache_put(cache_key, out) | |
| return out, base_label + splice_label | |