Spaces:
Sleeping
Sleeping
File size: 7,281 Bytes
308474b a8fbd60 308474b a8fbd60 a4ad2d3 a8fbd60 a4ad2d3 a8fbd60 a4ad2d3 a8fbd60 308474b a4ad2d3 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a4ad2d3 a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 308474b a8fbd60 a4ad2d3 308474b a8fbd60 308474b cfbab36 a8fbd60 cfbab36 a8fbd60 cfbab36 a8fbd60 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """
High-level ISO-NE per-zone demand fetcher for the Space.
Wraps the low-level fetcher in ``iso_ne_zonal.py`` with:
- In-memory cache (5-minute TTL) so repeated clicks within a few
minutes don't refetch from ISO-NE
- Optional bundled CSV fallback for offline / API-down scenarios
- Optional integration with a long-history CSV pulled from the data
repo at Space startup (used to seed Chronos context without
re-fetching 30 days of ISO-NE on every click)
Public API kept stable so ``app.py`` can swap from the old EIA-based
implementation without further changes:
- ``ZONE_COLS`` : list of 8 zone names
- ``fetch_recent_demand_mwh(end_dt)`` : (24, 8) MWh + source label
- ``fetch_long_history_mwh(end_dt, hours=720)`` : (hours, 8) MWh + label
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timedelta, timezone
from io import StringIO
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
import requests
from iso_ne_zonal import ZONE_COLS, fetch_range, fetch_recent_hours
logger = logging.getLogger(__name__)
ASSETS_DIR = Path(__file__).parent / "assets"
SAMPLE_CSV = ASSETS_DIR / "sample_demand_2022.csv"
SAMPLE_CSV_LONG = ASSETS_DIR / "sample_demand_2022_long.csv"
# In-memory cache: { ("recent", end_hour) | ("long", end_hour, hours) -> (ts, np.ndarray) }
_CACHE: dict = {}
_CACHE_TTL_SECONDS = 300
# Path of the data-repo 30-day CSV (refreshed daily by GitHub Actions in
# new-england-real-time-power-predict-data; downloaded by app.py at
# startup and saved to /tmp). When present, fetch_long_history_mwh
# uses it as the base and splices in the last 1-2 days from live API.
DATA_REPO_30D_CSV_PATH = Path(os.environ.get(
"DATA_REPO_30D_CSV_PATH", "/tmp/iso_ne_30d.csv"))
def _hour_floor_utc(dt: datetime) -> datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).replace(
minute=0, second=0, microsecond=0, tzinfo=None)
def _cache_get(key: tuple) -> Optional[np.ndarray]:
cached = _CACHE.get(key)
if cached is None:
return None
ts, arr = cached
if (datetime.now(timezone.utc) - ts).total_seconds() < _CACHE_TTL_SECONDS:
return arr.copy()
return None
def _cache_put(key: tuple, arr: np.ndarray) -> None:
_CACHE[(key)] = (datetime.now(timezone.utc), arr.copy())
def _load_sample_recent() -> np.ndarray:
df = pd.read_csv(SAMPLE_CSV)
arr = df[ZONE_COLS].tail(24).to_numpy(dtype=np.float32)
if arr.shape != (24, 8):
raise RuntimeError(
f"Bundled sample_demand_2022.csv has wrong shape {arr.shape}")
return arr
def _load_sample_long(hours: int) -> np.ndarray:
if SAMPLE_CSV_LONG.exists():
df = pd.read_csv(SAMPLE_CSV_LONG)
arr = df[ZONE_COLS].tail(hours).to_numpy(dtype=np.float32)
if arr.shape == (hours, 8):
return arr
short = _load_sample_recent()
return np.tile(short, (hours // 24 + 1, 1))[:hours].astype(np.float32)
def fetch_recent_demand_mwh(end_dt: Optional[datetime] = None
) -> tuple[np.ndarray, str]:
"""Return ``(24, 8)`` MWh for the most recent 24 contiguous hours
ending at ``end_dt`` (or now). Source label is one of:
- ``"live (ISO-NE 5-min zonal -> hourly)"``
- ``"cached"``
- ``"sample-2022"``
"""
if end_dt is None:
end_dt = datetime.now(timezone.utc)
end_dt = _hour_floor_utc(end_dt)
cache_key = ("recent", end_dt)
cached = _cache_get(cache_key)
if cached is not None:
return cached, "cached"
try:
arr, latest = fetch_recent_hours(end_dt, hours=24)
_cache_put(cache_key, arr)
lag_hours = (end_dt - latest).total_seconds() / 3600
label = f"live (ISO-NE 5-min zonal, latest hour {latest.isoformat()}, "
label += f"lag {lag_hours:.0f}h)" if lag_hours > 0 else f"live (ISO-NE 5-min zonal)"
return arr, label
except Exception as e: # noqa: BLE001
logger.warning("ISO-NE realtime fetch failed: %s; falling back to bundled CSV", e)
return _load_sample_recent(), "sample-2022 (ISO-NE unreachable)"
def _load_30d_base() -> Optional[pd.DataFrame]:
"""Load data-repo's pre-built 30-day per-zone CSV if available."""
if not DATA_REPO_30D_CSV_PATH.exists():
return None
try:
df = pd.read_csv(DATA_REPO_30D_CSV_PATH, parse_dates=["timestamp_utc"])
df = df.set_index("timestamp_utc").sort_index()
return df[ZONE_COLS]
except Exception as e: # noqa: BLE001
logger.warning("Failed to load 30d base CSV at %s: %s",
DATA_REPO_30D_CSV_PATH, e)
return None
def fetch_long_history_mwh(end_dt: Optional[datetime] = None,
hours: int = 720
) -> tuple[np.ndarray, str]:
"""Return ``(hours, 8)`` MWh of per-zone history ending at ``end_dt - 1h``.
Strategy:
1. If the data repo's 30d base CSV is present, start from it.
2. Otherwise fall back to the bundled long-history CSV.
3. Always splice the last ~24-48 hours from the live ISO-NE API
so the tail is fresh.
"""
if end_dt is None:
end_dt = datetime.now(timezone.utc)
end_dt = _hour_floor_utc(end_dt)
cache_key = ("long", end_dt, hours)
cached = _cache_get(cache_key)
if cached is not None:
return cached, "cached"
target_end = end_dt - timedelta(hours=1) # last hour we want
target_start = target_end - timedelta(hours=hours - 1)
base = _load_30d_base()
base_label = "data-repo 30d"
if base is None:
long_arr = _load_sample_long(hours)
out = long_arr
_cache_put(cache_key, out)
return out, "sample-2022 (no data-repo CSV)"
# Try to splice live ISO-NE for the last 2 days for freshness
splice_label = ""
try:
live = fetch_range(target_end - timedelta(days=2), target_end,
hourly=True)
# Overwrite overlapping rows in `base` with `live`
base.update(live)
splice_label = " + live splice"
except Exception as e: # noqa: BLE001
logger.info("Live splice into long history failed: %s", e)
# Ensure we have continuous coverage; if base doesn't reach target_start,
# fall back to bundled long CSV for the missing tail
if base.index.min() > target_start:
logger.info("30d base starts at %s, missing %s -> %s; padding from sample",
base.index.min(), target_start, base.index.min())
sample_long = _load_sample_long(hours)
out = sample_long
else:
# Slice exact window
idx = pd.date_range(start=target_start, end=target_end, freq="1h")
sliced = base.reindex(idx)
if sliced.isna().any().any():
logger.info("30d base has %d NaN rows in window; interpolating",
int(sliced.isna().any(axis=1).sum()))
sliced = sliced.interpolate(method="time", limit=12).ffill().bfill()
out = sliced[ZONE_COLS].to_numpy(dtype=np.float32)
_cache_put(cache_key, out)
return out, base_label + splice_label
|