predict-power / iso_ne_fetch.py
jeffliulab's picture
Real HRRR + true per-zone ISO-NE + 7-day rolling backtest from data repo
a8fbd60 verified
"""
High-level ISO-NE per-zone demand fetcher for the Space.
Wraps the low-level fetcher in ``iso_ne_zonal.py`` with:
- In-memory cache (5-minute TTL) so repeated clicks within a few
minutes don't refetch from ISO-NE
- Optional bundled CSV fallback for offline / API-down scenarios
- Optional integration with a long-history CSV pulled from the data
repo at Space startup (used to seed Chronos context without
re-fetching 30 days of ISO-NE on every click)
Public API kept stable so ``app.py`` can swap from the old EIA-based
implementation without further changes:
- ``ZONE_COLS`` : list of 8 zone names
- ``fetch_recent_demand_mwh(end_dt)`` : (24, 8) MWh + source label
- ``fetch_long_history_mwh(end_dt, hours=720)`` : (hours, 8) MWh + label
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timedelta, timezone
from io import StringIO
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
import requests
from iso_ne_zonal import ZONE_COLS, fetch_range, fetch_recent_hours
logger = logging.getLogger(__name__)
ASSETS_DIR = Path(__file__).parent / "assets"
SAMPLE_CSV = ASSETS_DIR / "sample_demand_2022.csv"
SAMPLE_CSV_LONG = ASSETS_DIR / "sample_demand_2022_long.csv"
# In-memory cache: { ("recent", end_hour) | ("long", end_hour, hours) -> (ts, np.ndarray) }
_CACHE: dict = {}
_CACHE_TTL_SECONDS = 300
# Path of the data-repo 30-day CSV (refreshed daily by GitHub Actions in
# new-england-real-time-power-predict-data; downloaded by app.py at
# startup and saved to /tmp). When present, fetch_long_history_mwh
# uses it as the base and splices in the last 1-2 days from live API.
DATA_REPO_30D_CSV_PATH = Path(os.environ.get(
"DATA_REPO_30D_CSV_PATH", "/tmp/iso_ne_30d.csv"))
def _hour_floor_utc(dt: datetime) -> datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).replace(
minute=0, second=0, microsecond=0, tzinfo=None)
def _cache_get(key: tuple) -> Optional[np.ndarray]:
cached = _CACHE.get(key)
if cached is None:
return None
ts, arr = cached
if (datetime.now(timezone.utc) - ts).total_seconds() < _CACHE_TTL_SECONDS:
return arr.copy()
return None
def _cache_put(key: tuple, arr: np.ndarray) -> None:
_CACHE[(key)] = (datetime.now(timezone.utc), arr.copy())
def _load_sample_recent() -> np.ndarray:
df = pd.read_csv(SAMPLE_CSV)
arr = df[ZONE_COLS].tail(24).to_numpy(dtype=np.float32)
if arr.shape != (24, 8):
raise RuntimeError(
f"Bundled sample_demand_2022.csv has wrong shape {arr.shape}")
return arr
def _load_sample_long(hours: int) -> np.ndarray:
if SAMPLE_CSV_LONG.exists():
df = pd.read_csv(SAMPLE_CSV_LONG)
arr = df[ZONE_COLS].tail(hours).to_numpy(dtype=np.float32)
if arr.shape == (hours, 8):
return arr
short = _load_sample_recent()
return np.tile(short, (hours // 24 + 1, 1))[:hours].astype(np.float32)
def fetch_recent_demand_mwh(end_dt: Optional[datetime] = None
) -> tuple[np.ndarray, str]:
"""Return ``(24, 8)`` MWh for the most recent 24 contiguous hours
ending at ``end_dt`` (or now). Source label is one of:
- ``"live (ISO-NE 5-min zonal -> hourly)"``
- ``"cached"``
- ``"sample-2022"``
"""
if end_dt is None:
end_dt = datetime.now(timezone.utc)
end_dt = _hour_floor_utc(end_dt)
cache_key = ("recent", end_dt)
cached = _cache_get(cache_key)
if cached is not None:
return cached, "cached"
try:
arr, latest = fetch_recent_hours(end_dt, hours=24)
_cache_put(cache_key, arr)
lag_hours = (end_dt - latest).total_seconds() / 3600
label = f"live (ISO-NE 5-min zonal, latest hour {latest.isoformat()}, "
label += f"lag {lag_hours:.0f}h)" if lag_hours > 0 else f"live (ISO-NE 5-min zonal)"
return arr, label
except Exception as e: # noqa: BLE001
logger.warning("ISO-NE realtime fetch failed: %s; falling back to bundled CSV", e)
return _load_sample_recent(), "sample-2022 (ISO-NE unreachable)"
def _load_30d_base() -> Optional[pd.DataFrame]:
"""Load data-repo's pre-built 30-day per-zone CSV if available."""
if not DATA_REPO_30D_CSV_PATH.exists():
return None
try:
df = pd.read_csv(DATA_REPO_30D_CSV_PATH, parse_dates=["timestamp_utc"])
df = df.set_index("timestamp_utc").sort_index()
return df[ZONE_COLS]
except Exception as e: # noqa: BLE001
logger.warning("Failed to load 30d base CSV at %s: %s",
DATA_REPO_30D_CSV_PATH, e)
return None
def fetch_long_history_mwh(end_dt: Optional[datetime] = None,
hours: int = 720
) -> tuple[np.ndarray, str]:
"""Return ``(hours, 8)`` MWh of per-zone history ending at ``end_dt - 1h``.
Strategy:
1. If the data repo's 30d base CSV is present, start from it.
2. Otherwise fall back to the bundled long-history CSV.
3. Always splice the last ~24-48 hours from the live ISO-NE API
so the tail is fresh.
"""
if end_dt is None:
end_dt = datetime.now(timezone.utc)
end_dt = _hour_floor_utc(end_dt)
cache_key = ("long", end_dt, hours)
cached = _cache_get(cache_key)
if cached is not None:
return cached, "cached"
target_end = end_dt - timedelta(hours=1) # last hour we want
target_start = target_end - timedelta(hours=hours - 1)
base = _load_30d_base()
base_label = "data-repo 30d"
if base is None:
long_arr = _load_sample_long(hours)
out = long_arr
_cache_put(cache_key, out)
return out, "sample-2022 (no data-repo CSV)"
# Try to splice live ISO-NE for the last 2 days for freshness
splice_label = ""
try:
live = fetch_range(target_end - timedelta(days=2), target_end,
hourly=True)
# Overwrite overlapping rows in `base` with `live`
base.update(live)
splice_label = " + live splice"
except Exception as e: # noqa: BLE001
logger.info("Live splice into long history failed: %s", e)
# Ensure we have continuous coverage; if base doesn't reach target_start,
# fall back to bundled long CSV for the missing tail
if base.index.min() > target_start:
logger.info("30d base starts at %s, missing %s -> %s; padding from sample",
base.index.min(), target_start, base.index.min())
sample_long = _load_sample_long(hours)
out = sample_long
else:
# Slice exact window
idx = pd.date_range(start=target_start, end=target_end, freq="1h")
sliced = base.reindex(idx)
if sliced.isna().any().any():
logger.info("30d base has %d NaN rows in window; interpolating",
int(sliced.isna().any(axis=1).sum()))
sliced = sliced.interpolate(method="time", limit=12).ffill().bfill()
out = sliced[ZONE_COLS].to_numpy(dtype=np.float32)
_cache_put(cache_key, out)
return out, base_label + splice_label