Spaces:
Sleeping
Sleeping
| from enum import IntEnum | |
| import numpy as np | |
| STEPS_PER_HOUR = 12 | |
| class Decision(IntEnum): | |
| # use solar to satisfy consumption, | |
| # and if it is not enough, use network. | |
| # BESS is not discharged in this mode, | |
| # but might be charged if solar has surplus. | |
| PASSIVE = 0 | |
| # use the battery if possible and necessary. | |
| # the possible part means that there's charge in it, | |
| # and the necessary part means that the consumption | |
| # is not already covered by solar. | |
| DISCHARGE = 1 | |
| # use the network to charge the battery | |
| # this is similar to PASSIVE, but forces the | |
| # BESS to be charged, even if solar does not cover | |
| # the whole of consumption plus BESS. | |
| NETWORK_CHARGE = 2 | |
| class RandomDecider: | |
| def __init__(self, params, precalculated_supplier): | |
| self.input_window_size = STEPS_PER_HOUR * 24 # day long window. | |
| self.precalculated_supplier = precalculated_supplier | |
| assert params.shape == (2, ) | |
| # param_1 is prob of choosing PASSIVE | |
| # param_2 is prob of choosing NETWORK_CHARGE | |
| self.passive_prob, self.network_prob = params | |
| def decide(self, prod_pred, cons_pred, fees, battery_model): | |
| r = np.random.rand() | |
| if r < self.passive_prob: | |
| return Decision.PASSIVE | |
| elif r < self.passive_prob + self.network_prob: | |
| return Decision.NETWORK_CHARGE | |
| else: | |
| return Decision.DISCHARGE | |
| def clip_params(params): | |
| assert params.shape == (2, ) | |
| p1, p2 = params | |
| p1 = max((0, p1)) | |
| p2 = max((0, p2)) | |
| s = p1 + p2 | |
| if s > 1: | |
| p1 /= s | |
| p2 /= s | |
| return np.array([p1, p2]) | |
| def initial_params(): | |
| init_mean = np.array([1/3, 1/3]) | |
| init_scale = np.array([1.0, 1.0]) | |
| return init_mean, init_scale | |
| # mock class as usual | |
| # output_window_size is not yet used, always decides one timestep. | |
| class Decider: | |
| def __init__(self, params, precalculated_supplier): | |
| self.input_window_size = STEPS_PER_HOUR * 24 # day long window. | |
| self.precalculated_supplier = precalculated_supplier | |
| assert params.shape == (2, ) | |
| # param_1 is how many minutes do we look ahead to decide if there's | |
| # an upcoming shortage. | |
| # param_2 is the threshhold for shortage, in kW not kWh, | |
| # that is, parametrized as an average over the window. | |
| self.surdemand_lookahead_window_min, self.lookahead_surdemand_kw = params | |
| def decide(self, prod_pred, cons_pred, fees, battery_model): | |
| # TODO 15 minutes demand charge window hardwired at this weird place | |
| DEMAND_CHARGE_WINDOW_MIN = 15 | |
| time_interval_min = self.precalculated_supplier.time_index.freq.n | |
| assert DEMAND_CHARGE_WINDOW_MIN % time_interval_min == 0 | |
| peak_shaving_window = DEMAND_CHARGE_WINDOW_MIN // time_interval_min | |
| step_in_hour = time_interval_min / 60 # [hour], the length of a time step. | |
| deficit_kw = (cons_pred[:peak_shaving_window] - prod_pred[:peak_shaving_window]).clip(min=0) | |
| deficit_kwh = (step_in_hour * deficit_kw).sum() | |
| surdemand_window = int(self.surdemand_lookahead_window_min // time_interval_min) | |
| mean_surdemand_kw = (cons_pred[:surdemand_window] - prod_pred[:surdemand_window]).mean() | |
| current_fee = fees[0] | |
| # TODO this should not be a hard threshold, and more importantly, | |
| # it should not be hardwired. | |
| HARDWIRED_THRESHOLD_FOR_CHEAP_POWER_HUF_PER_KWH = 20 # [HUF/kWh] | |
| if mean_surdemand_kw > self.lookahead_surdemand_kw and current_fee <= HARDWIRED_THRESHOLD_FOR_CHEAP_POWER_HUF_PER_KWH: | |
| decision = Decision.NETWORK_CHARGE | |
| # peak shaving | |
| elif deficit_kwh > self.precalculated_supplier.peak_demand: | |
| decision = Decision.DISCHARGE | |
| else: | |
| decision = Decision.PASSIVE | |
| return decision | |
| # this is called by the optimizer so that meaningless parameter settings are not attempted | |
| # we could vectorize this easily, but it's not a bottleneck, the simulation is. | |
| def clip_params(params): | |
| assert params.shape == (2, ) | |
| surdemand_lookahead_window_min, lookahead_surdemand_kw = params | |
| surdemand_lookahead_window_min = np.clip(surdemand_lookahead_window_min, 5, 60 * 24 * 3) | |
| # no-op right now: | |
| lookahead_surdemand_kw = np.clip(lookahead_surdemand_kw, -np.inf, np.inf) | |
| return np.array([surdemand_lookahead_window_min, lookahead_surdemand_kw]) | |
| def initial_params(): | |
| # surdemand_lookahead_window_min, lookahead_surdemand_kw | |
| # param1 [minutes]. one day mean, half day scale for lookahead horizon. | |
| # param2 [kWh], averaged over the horizon. negative values are meaningful. | |
| init_mean = np.array([60 * 24.0, 0.0]) | |
| init_scale = np.array([60 * 12.0, 100.0]) | |
| return init_mean, init_scale | |