File size: 7,281 Bytes
308474b
a8fbd60
308474b
a8fbd60
a4ad2d3
a8fbd60
 
 
 
 
 
a4ad2d3
a8fbd60
 
a4ad2d3
a8fbd60
 
 
308474b
 
 
 
 
a4ad2d3
308474b
a8fbd60
308474b
 
 
 
 
 
 
a8fbd60
 
 
308474b
 
 
a8fbd60
308474b
a8fbd60
308474b
a8fbd60
308474b
a8fbd60
 
 
 
 
 
308474b
 
a8fbd60
 
 
 
 
308474b
a4ad2d3
a8fbd60
 
 
308474b
a8fbd60
 
 
 
308474b
 
a8fbd60
 
308474b
 
a8fbd60
308474b
 
 
a8fbd60
 
308474b
 
 
a8fbd60
 
 
 
 
 
 
 
 
308474b
a8fbd60
 
 
 
 
 
 
308474b
 
a8fbd60
 
 
 
308474b
a8fbd60
 
 
 
 
 
 
 
 
 
 
a4ad2d3
308474b
a8fbd60
 
 
 
 
 
 
 
 
 
 
 
308474b
 
cfbab36
a8fbd60
 
 
cfbab36
 
a8fbd60
 
 
 
cfbab36
 
a8fbd60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
High-level ISO-NE per-zone demand fetcher for the Space.

Wraps the low-level fetcher in ``iso_ne_zonal.py`` with:

  - In-memory cache (5-minute TTL) so repeated clicks within a few
    minutes don't refetch from ISO-NE
  - Optional bundled CSV fallback for offline / API-down scenarios
  - Optional integration with a long-history CSV pulled from the data
    repo at Space startup (used to seed Chronos context without
    re-fetching 30 days of ISO-NE on every click)

Public API kept stable so ``app.py`` can swap from the old EIA-based
implementation without further changes:

  - ``ZONE_COLS``                          : list of 8 zone names
  - ``fetch_recent_demand_mwh(end_dt)``    : (24, 8) MWh + source label
  - ``fetch_long_history_mwh(end_dt, hours=720)`` : (hours, 8) MWh + label
"""

from __future__ import annotations

import logging
import os
from datetime import datetime, timedelta, timezone
from io import StringIO
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd
import requests

from iso_ne_zonal import ZONE_COLS, fetch_range, fetch_recent_hours

logger = logging.getLogger(__name__)

ASSETS_DIR = Path(__file__).parent / "assets"
SAMPLE_CSV = ASSETS_DIR / "sample_demand_2022.csv"
SAMPLE_CSV_LONG = ASSETS_DIR / "sample_demand_2022_long.csv"

# In-memory cache: { ("recent", end_hour) | ("long", end_hour, hours) -> (ts, np.ndarray) }
_CACHE: dict = {}
_CACHE_TTL_SECONDS = 300

# Path of the data-repo 30-day CSV (refreshed daily by GitHub Actions in
# new-england-real-time-power-predict-data; downloaded by app.py at
# startup and saved to /tmp). When present, fetch_long_history_mwh
# uses it as the base and splices in the last 1-2 days from live API.
DATA_REPO_30D_CSV_PATH = Path(os.environ.get(
    "DATA_REPO_30D_CSV_PATH", "/tmp/iso_ne_30d.csv"))


def _hour_floor_utc(dt: datetime) -> datetime:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc).replace(
        minute=0, second=0, microsecond=0, tzinfo=None)


def _cache_get(key: tuple) -> Optional[np.ndarray]:
    cached = _CACHE.get(key)
    if cached is None:
        return None
    ts, arr = cached
    if (datetime.now(timezone.utc) - ts).total_seconds() < _CACHE_TTL_SECONDS:
        return arr.copy()
    return None


def _cache_put(key: tuple, arr: np.ndarray) -> None:
    _CACHE[(key)] = (datetime.now(timezone.utc), arr.copy())


def _load_sample_recent() -> np.ndarray:
    df = pd.read_csv(SAMPLE_CSV)
    arr = df[ZONE_COLS].tail(24).to_numpy(dtype=np.float32)
    if arr.shape != (24, 8):
        raise RuntimeError(
            f"Bundled sample_demand_2022.csv has wrong shape {arr.shape}")
    return arr


def _load_sample_long(hours: int) -> np.ndarray:
    if SAMPLE_CSV_LONG.exists():
        df = pd.read_csv(SAMPLE_CSV_LONG)
        arr = df[ZONE_COLS].tail(hours).to_numpy(dtype=np.float32)
        if arr.shape == (hours, 8):
            return arr
    short = _load_sample_recent()
    return np.tile(short, (hours // 24 + 1, 1))[:hours].astype(np.float32)


def fetch_recent_demand_mwh(end_dt: Optional[datetime] = None
                              ) -> tuple[np.ndarray, str]:
    """Return ``(24, 8)`` MWh for the most recent 24 contiguous hours
    ending at ``end_dt`` (or now). Source label is one of:
      - ``"live (ISO-NE 5-min zonal -> hourly)"``
      - ``"cached"``
      - ``"sample-2022"``
    """
    if end_dt is None:
        end_dt = datetime.now(timezone.utc)
    end_dt = _hour_floor_utc(end_dt)
    cache_key = ("recent", end_dt)
    cached = _cache_get(cache_key)
    if cached is not None:
        return cached, "cached"
    try:
        arr, latest = fetch_recent_hours(end_dt, hours=24)
        _cache_put(cache_key, arr)
        lag_hours = (end_dt - latest).total_seconds() / 3600
        label = f"live (ISO-NE 5-min zonal, latest hour {latest.isoformat()}, "
        label += f"lag {lag_hours:.0f}h)" if lag_hours > 0 else f"live (ISO-NE 5-min zonal)"
        return arr, label
    except Exception as e:  # noqa: BLE001
        logger.warning("ISO-NE realtime fetch failed: %s; falling back to bundled CSV", e)
        return _load_sample_recent(), "sample-2022 (ISO-NE unreachable)"


def _load_30d_base() -> Optional[pd.DataFrame]:
    """Load data-repo's pre-built 30-day per-zone CSV if available."""
    if not DATA_REPO_30D_CSV_PATH.exists():
        return None
    try:
        df = pd.read_csv(DATA_REPO_30D_CSV_PATH, parse_dates=["timestamp_utc"])
        df = df.set_index("timestamp_utc").sort_index()
        return df[ZONE_COLS]
    except Exception as e:  # noqa: BLE001
        logger.warning("Failed to load 30d base CSV at %s: %s",
                        DATA_REPO_30D_CSV_PATH, e)
        return None


def fetch_long_history_mwh(end_dt: Optional[datetime] = None,
                             hours: int = 720
                             ) -> tuple[np.ndarray, str]:
    """Return ``(hours, 8)`` MWh of per-zone history ending at ``end_dt - 1h``.

    Strategy:
      1. If the data repo's 30d base CSV is present, start from it.
      2. Otherwise fall back to the bundled long-history CSV.
      3. Always splice the last ~24-48 hours from the live ISO-NE API
         so the tail is fresh.
    """
    if end_dt is None:
        end_dt = datetime.now(timezone.utc)
    end_dt = _hour_floor_utc(end_dt)
    cache_key = ("long", end_dt, hours)
    cached = _cache_get(cache_key)
    if cached is not None:
        return cached, "cached"

    target_end = end_dt - timedelta(hours=1)        # last hour we want
    target_start = target_end - timedelta(hours=hours - 1)

    base = _load_30d_base()
    base_label = "data-repo 30d"

    if base is None:
        long_arr = _load_sample_long(hours)
        out = long_arr
        _cache_put(cache_key, out)
        return out, "sample-2022 (no data-repo CSV)"

    # Try to splice live ISO-NE for the last 2 days for freshness
    splice_label = ""
    try:
        live = fetch_range(target_end - timedelta(days=2), target_end,
                            hourly=True)
        # Overwrite overlapping rows in `base` with `live`
        base.update(live)
        splice_label = " + live splice"
    except Exception as e:  # noqa: BLE001
        logger.info("Live splice into long history failed: %s", e)

    # Ensure we have continuous coverage; if base doesn't reach target_start,
    # fall back to bundled long CSV for the missing tail
    if base.index.min() > target_start:
        logger.info("30d base starts at %s, missing %s -> %s; padding from sample",
                     base.index.min(), target_start, base.index.min())
        sample_long = _load_sample_long(hours)
        out = sample_long
    else:
        # Slice exact window
        idx = pd.date_range(start=target_start, end=target_end, freq="1h")
        sliced = base.reindex(idx)
        if sliced.isna().any().any():
            logger.info("30d base has %d NaN rows in window; interpolating",
                         int(sliced.isna().any(axis=1).sum()))
            sliced = sliced.interpolate(method="time", limit=12).ffill().bfill()
        out = sliced[ZONE_COLS].to_numpy(dtype=np.float32)

    _cache_put(cache_key, out)
    return out, base_label + splice_label