File size: 12,096 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""
EnergyBudgetPlanner: hierarchical energy sacrifice budget for agrivoltaic control.

Budget hierarchy:
  Annual → Monthly → Weekly → Daily → 15-min Slot

The system defaults to full astronomical tracking (max energy). Shading
interventions draw from a tight budget (default 5% of annual generation).
Budget is pre-allocated down the hierarchy so that hot days/hours get more,
and the system never overspends.

References:
  - config/settings.py for all thresholds and weights
  - context/2_plan.md §3.1 for design rationale
"""

from __future__ import annotations

from datetime import date, timedelta
from typing import Optional

import numpy as np
import pandas as pd

from config.settings import (
    ANNUAL_RESERVE_PCT,
    DAILY_MARGIN_PCT,
    MAX_ENERGY_REDUCTION_PCT,
    MONTHLY_BUDGET_WEIGHTS,
    NO_SHADE_BEFORE_HOUR,
    WEEKLY_RESERVE_PCT,
)


class EnergyBudgetPlanner:
    """Hierarchical energy sacrifice budget for agrivoltaic shading control.

    Parameters
    ----------
    max_energy_reduction_pct : float
        Maximum fraction of annual PV generation the vines can "spend" on
        shading (default from config: 5%).
    shadow_model : object, optional
        ShadowModel instance used to estimate slot-level energy potential.
        If None, annual plan uses a simplified analytical estimate.
    """

    def __init__(
        self,
        max_energy_reduction_pct: float = MAX_ENERGY_REDUCTION_PCT,
        shadow_model=None,
    ):
        self.max_pct = max_energy_reduction_pct
        self.shadow = shadow_model

    # ------------------------------------------------------------------
    # Annual plan
    # ------------------------------------------------------------------

    def compute_annual_plan(self, year: int) -> dict:
        """Compute seasonal energy potential and allocate monthly budgets.

        Iterates every 15-min slot from May 1 to Sep 30, computing energy
        under astronomical tracking.  Then distributes the sacrifice budget
        across months using MONTHLY_BUDGET_WEIGHTS.

        Returns dict with:
          year, total_potential_kWh, total_budget_kWh, annual_reserve_kWh,
          monthly_budgets (dict[int, float]), budget_spent_kWh
        """
        season_start = pd.Timestamp(f"{year}-05-01", tz="UTC")
        season_end = pd.Timestamp(f"{year}-09-30 23:45", tz="UTC")
        times = pd.date_range(season_start, season_end, freq="15min")

        if self.shadow is not None:
            energy_per_slot = self._energy_from_shadow_model(times)
        else:
            energy_per_slot = self._energy_analytical(times)

        total_potential = float(np.sum(energy_per_slot))
        total_budget = total_potential * self.max_pct / 100.0
        annual_reserve = total_budget * ANNUAL_RESERVE_PCT / 100.0
        distributable = total_budget - annual_reserve

        monthly_budgets = {
            month: distributable * weight
            for month, weight in MONTHLY_BUDGET_WEIGHTS.items()
        }

        return {
            "year": year,
            "total_potential_kWh": round(total_potential, 2),
            "total_budget_kWh": round(total_budget, 2),
            "annual_reserve_kWh": round(annual_reserve, 2),
            "monthly_budgets": {m: round(v, 4) for m, v in monthly_budgets.items()},
            "budget_spent_kWh": 0.0,
        }

    def _energy_from_shadow_model(self, times: pd.DatetimeIndex) -> np.ndarray:
        """Estimate per-slot energy using the ShadowModel's solar position."""
        solar_pos = self.shadow.get_solar_position(times)
        energy = []
        for _, sp in solar_pos.iterrows():
            if sp["solar_elevation"] <= 0:
                energy.append(0.0)
                continue
            tracker = self.shadow.compute_tracker_tilt(
                sp["solar_azimuth"], sp["solar_elevation"]
            )
            # cos(AOI) × 0.25h slot duration → kWh per kWp
            e = max(0.0, np.cos(np.radians(tracker["aoi"]))) * 0.25
            energy.append(e)
        return np.array(energy)

    @staticmethod
    def _energy_analytical(times: pd.DatetimeIndex) -> np.ndarray:
        """Simplified analytical estimate when no ShadowModel is available.

        Vectorized: computes all ~15k slots in one numpy pass.
        Uses a sinusoidal day profile peaking at solar noon.  Good enough
        for budget planning; not used for real-time control.
        """
        from config.settings import SITE_LATITUDE

        hour_utc = times.hour + times.minute / 60.0
        solar_noon_utc = 12.0 - 34.8 / 15.0  # ≈ 9.68 UTC
        hour_angle = (hour_utc - solar_noon_utc) * 15.0  # degrees

        lat_rad = np.radians(SITE_LATITUDE)
        doy = times.dayofyear
        decl_rad = np.radians(23.45 * np.sin(np.radians(360.0 / 365.0 * (doy - 81))))
        ha_rad = np.radians(hour_angle)

        sin_elev = (
            np.sin(lat_rad) * np.sin(decl_rad)
            + np.cos(lat_rad) * np.cos(decl_rad) * np.cos(ha_rad)
        )
        # Astronomical tracking → AOI ≈ 0 → cos(AOI) ≈ 1
        # Scale by clearness (~0.75 for Sde Boker) and slot duration (0.25h)
        return np.where(sin_elev > 0, sin_elev * 0.75 * 0.25, 0.0)

    # ------------------------------------------------------------------
    # Weekly plan
    # ------------------------------------------------------------------

    def compute_weekly_plan(
        self,
        week_start: pd.Timestamp | date,
        monthly_remaining: float,
        forecast_tmax: Optional[list[float]] = None,
        rollover: float = 0.0,
    ) -> dict:
        """Distribute weekly budget to days, weighted by (Tmax - 30)².

        Days with forecast Tmax < 30°C get zero allocation (no stress
        expected).  Hot days get quadratically more budget.

        Parameters
        ----------
        week_start : date-like
            First day of the week.
        monthly_remaining : float
            Remaining monthly budget (kWh).
        forecast_tmax : list of 7 floats, optional
            Forecast daily maximum temperature for each day of the week.
            If None, budget is split evenly.
        rollover : float
            Unspent budget rolled over from the previous week.

        Returns dict with:
          weekly_total_kWh, weekly_reserve_kWh, daily_budgets_kWh (list[7])
        """
        if not isinstance(week_start, pd.Timestamp):
            week_start = pd.Timestamp(week_start)

        month = week_start.month
        # Estimate weeks remaining in the month
        if month == 12:
            month_end = pd.Timestamp(f"{week_start.year}-12-31")
        elif month == 9:
            month_end = pd.Timestamp(f"{week_start.year}-09-30")
        else:
            month_end = pd.Timestamp(
                f"{week_start.year}-{month + 1:02d}-01"
            ) - timedelta(days=1)
        days_left = max(1, (month_end - week_start).days)
        weeks_left = max(1, days_left // 7)

        weekly_raw = monthly_remaining / weeks_left + rollover
        weekly_reserve = weekly_raw * WEEKLY_RESERVE_PCT / 100.0
        distributable = weekly_raw - weekly_reserve

        if forecast_tmax is not None and len(forecast_tmax) == 7:
            weights = [max(0.0, t - 30.0) ** 2 for t in forecast_tmax]
            total_w = sum(weights)
            if total_w > 0:
                daily = [distributable * w / total_w for w in weights]
            else:
                daily = [0.0] * 7  # all days < 30°C → no budget needed
        else:
            daily = [distributable / 7.0] * 7

        return {
            "weekly_total_kWh": round(weekly_raw, 4),
            "weekly_reserve_kWh": round(weekly_reserve, 4),
            "daily_budgets_kWh": [round(d, 4) for d in daily],
        }

    # ------------------------------------------------------------------
    # Daily plan
    # ------------------------------------------------------------------

    def compute_daily_plan(
        self,
        day: date | pd.Timestamp,
        daily_budget: float,
        rollover: float = 0.0,
    ) -> dict:
        """Distribute daily budget to 15-min slots.

        Zero before NO_SHADE_BEFORE_HOUR (10:00).  Peak allocation at
        11:00–14:00 (60% of planned budget).

        Returns dict with:
          date, daily_total_kWh, daily_margin_kWh, daily_margin_remaining_kWh,
          slot_budgets (dict[str, float]), cumulative_spent
        """
        daily_raw = daily_budget + rollover
        daily_margin = daily_raw * DAILY_MARGIN_PCT / 100.0
        planned = daily_raw - daily_margin

        # Time blocks with their share of the planned budget.
        # The non-zero weights must sum to 1.0.
        transition_end = max(NO_SHADE_BEFORE_HOUR + 1, 11)
        blocks = [
            ((5, NO_SHADE_BEFORE_HOUR), 0.00),         # morning — no shade
            ((NO_SHADE_BEFORE_HOUR, transition_end), 0.05),  # transition
            ((transition_end, 14), 0.60),               # peak stress window
            ((14, 16), 0.30),                            # sustained heat
            ((16, 20), 0.05),                            # rare late stress
        ]

        slot_budgets: dict[str, float] = {}
        for (h_start, h_end), weight in blocks:
            block_budget = planned * weight
            n_slots = (h_end - h_start) * 4  # 4 slots per hour
            per_slot = block_budget / n_slots if n_slots > 0 else 0.0
            for h in range(h_start, h_end):
                for m in (0, 15, 30, 45):
                    slot_budgets[f"{h:02d}:{m:02d}"] = round(per_slot, 6)

        return {
            "date": str(day),
            "daily_total_kWh": round(daily_raw, 4),
            "daily_margin_kWh": round(daily_margin, 4),
            "daily_margin_remaining_kWh": round(daily_margin, 4),
            "slot_budgets": slot_budgets,
            "cumulative_spent": 0.0,
        }

    # ------------------------------------------------------------------
    # Slot-level execution helpers
    # ------------------------------------------------------------------

    def spend_slot(self, daily_plan: dict, slot_key: str, amount: float) -> float:
        """Deduct energy from a slot's budget. Returns amount actually spent.

        If the slot budget is insufficient, draws from the daily margin.
        """
        available = daily_plan["slot_budgets"].get(slot_key, 0.0)
        if amount <= available:
            daily_plan["slot_budgets"][slot_key] -= amount
            daily_plan["cumulative_spent"] += amount
            return amount

        # Slot budget exhausted — try daily margin
        shortfall = amount - available
        margin = daily_plan["daily_margin_remaining_kWh"]
        from_margin = min(shortfall, margin)
        total_spent = available + from_margin

        daily_plan["slot_budgets"][slot_key] = 0.0
        daily_plan["daily_margin_remaining_kWh"] -= from_margin
        daily_plan["cumulative_spent"] += total_spent
        return round(total_spent, 6)

    def emergency_draw(self, annual_plan: dict, amount: float) -> float:
        """Draw from annual reserve for extreme heat events.

        Returns the amount actually drawn (may be less than requested if
        the reserve is depleted).
        """
        available = annual_plan["annual_reserve_kWh"]
        drawn = min(amount, available)
        annual_plan["annual_reserve_kWh"] = round(available - drawn, 4)
        annual_plan["budget_spent_kWh"] = round(
            annual_plan["budget_spent_kWh"] + drawn, 4
        )
        return round(drawn, 4)

    # ------------------------------------------------------------------
    # Rollover helper
    # ------------------------------------------------------------------

    def compute_daily_rollover(self, daily_plan: dict) -> float:
        """Compute unspent budget at end of day (available for next day)."""
        unspent_slots = sum(daily_plan["slot_budgets"].values())
        unspent_margin = daily_plan["daily_margin_remaining_kWh"]
        return round(unspent_slots + unspent_margin, 4)