| import gym |
| import pandas as pd |
| import numpy as np |
| from collections import deque |
| import random |
| random.seed(42) |
| np.random.seed(42) |
|
|
| class SolarSys(gym.Env): |
|
|
| def __init__( |
| self, |
| data_path="/path/to/project/training/200houses_152days_TRAIN.csv", |
| state="oklahoma", |
| time_freq="15T", |
| ): |
| |
| super().__init__() |
| |
| self.data_path = data_path |
| self.time_freq = time_freq |
| self.state = state.lower() |
|
|
| |
| self._pricing_info = { |
| "oklahoma": { |
| "max_grid_price": 0.2112, |
| "feed_in_tariff": 0.04, |
| "price_function": self._get_oklahoma_price |
| }, |
| "colorado": { |
| "max_grid_price": 0.32, |
| "feed_in_tariff": 0.055, |
| "price_function": self._get_colorado_price |
| }, |
| "pennsylvania": { |
| "max_grid_price": 0.12505, |
| "feed_in_tariff": 0.06, |
| "price_function": self._get_pennsylvania_price |
| } |
| } |
|
|
| if self.state not in self._pricing_info: |
| raise ValueError(f"State '{self.state}' is not supported. Available states: {list(self._pricing_info.keys())}") |
| |
| state_config = self._pricing_info[self.state] |
| self.max_grid_price = state_config["max_grid_price"] |
| self.feed_in_tariff = state_config["feed_in_tariff"] |
| self._get_price_function = state_config["price_function"] |
|
|
| try: |
| all_data = pd.read_csv(data_path) |
| all_data["local_15min"] = pd.to_datetime(all_data["local_15min"], utc=True) |
| all_data.set_index("local_15min", inplace=True) |
| all_data = all_data.resample(time_freq).mean() |
|
|
| except FileNotFoundError: |
| raise FileNotFoundError(f"Data file {data_path} not found.") |
| except pd.errors.EmptyDataError: |
| raise ValueError(f"Data file {data_path} is empty.") |
| except Exception as e: |
| raise ValueError(f"Error loading data: {e}") |
|
|
| |
| grid_cols = [c for c in all_data.columns if c.startswith("grid_")] |
| solar_cols = [c for c in all_data.columns if c.startswith("total_solar_")] |
| all_grid = all_data[grid_cols].values |
| all_solar = all_data[solar_cols].values |
|
|
| |
| self.global_max_demand = float((all_grid + all_solar).max()) + 1e-8 |
|
|
| |
| self.global_max_solar = float(all_solar.max()) + 1e-8 |
|
|
| |
| self.all_data = all_data |
|
|
| self.time_freq = time_freq |
| freq_offset = pd.tseries.frequencies.to_offset(time_freq) |
| minutes_per_step = freq_offset.nanos / 1e9 / 60.0 |
| self.steps_per_day = int(24 * 60 // minutes_per_step) |
|
|
| total_rows = len(self.all_data) |
| self.total_days = total_rows // self.steps_per_day |
| if self.total_days < 1: |
| raise ValueError( |
| f"After resampling, dataset has {total_rows} rows, which is " |
| f"less than a single day of {self.steps_per_day} steps." |
| ) |
|
|
| self.house_ids = [ |
| col.split("_")[1] for col in self.all_data.columns |
| if col.startswith("grid_") |
| ] |
| self.num_agents = len(self.house_ids) |
| self.original_no_p2p_import = {} |
| for hid in self.house_ids: |
| col_grid = f"grid_{hid}" |
| self.original_no_p2p_import[hid] = self.all_data[col_grid].clip(lower=0.0).values |
| |
| |
| |
| solar_cols = [f"total_solar_{hid}" for hid in self.house_ids] |
| solar_sums = self.all_data[solar_cols].sum(axis=0).to_dict() |
| self.agent_groups = [ |
| 1 if solar_sums[f"total_solar_{hid}"] > 0 else 0 |
| for hid in self.house_ids |
| ] |
|
|
| |
| self.group_counts = { |
| 0: self.agent_groups.count(0), |
| 1: self.agent_groups.count(1) |
| } |
| print(f"Number of houses in each group: {self.group_counts}") |
|
|
| |
| self.battery_options = { |
| "teslapowerwall": {"max_capacity": 13.5, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 5.0, "max_discharge_rate": 5.0, "degradation_cost_per_kwh": 0.005}, |
| "enphase": {"max_capacity": 5.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 2.0, "max_discharge_rate": 2.0, "degradation_cost_per_kwh": 0.005}, |
| "franklin": {"max_capacity": 15.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 6.0, "max_discharge_rate": 6.0, "degradation_cost_per_kwh": 0.005}, |
| } |
| |
| |
| self.solar_houses = [ |
| hid for hid in self.house_ids |
| if (self.all_data[f"total_solar_{hid}"] > 0).any() |
| ] |
|
|
| |
| self.batteries = {} |
| for hid in self.solar_houses: |
| choice = random.choice(list(self.battery_options)) |
| specs = self.battery_options[choice] |
| self.batteries[hid] = {"soc": 0.0, **specs} |
|
|
| |
| |
| |
| self.observation_space = gym.spaces.Box( |
| low=-np.inf, high=np.inf, |
| shape=(self.num_agents, 8), |
| dtype=np.float32 |
| ) |
| |
| |
| self.action_space = gym.spaces.Box( |
| low=0.0, |
| high=1.0, |
| shape=(self.num_agents, 6), |
| dtype=np.float32 |
| ) |
| |
| self.episode_metrics = {} |
| self._initialize_episode_metrics() |
| |
| |
| self.data = None |
| self.env_log = [] |
| self.day_index = -1 |
| self.current_step = 0 |
| self.num_steps = self.steps_per_day |
| self.demands = {} |
| self.solars = {} |
| self.previous_actions = { |
| hid: np.zeros(6) for hid in self.house_ids |
| } |
|
|
|
|
| def _initialize_episode_metrics(self): |
| """Initialize or reset all metrics tracked over a single episode.""" |
| self.cumulative_grid_reduction = 0.0 |
| self.cumulative_grid_reduction_peak = 0.0 |
| self.cumulative_degradation_cost = 0.0 |
| self.agent_cost_savings = np.zeros(self.num_agents) |
| self.degradation_cost_timeseries = [] |
| self.cost_savings_timeseries = [] |
| self.grid_reduction_timeseries = [] |
|
|
|
|
| |
| def get_grid_price(self, step_idx): |
| """Return grid price for the current step based on selected state.""" |
| return self._get_price_function(step_idx) |
|
|
|
|
| def _get_oklahoma_price(self, step_idx): |
| |
| minutes_per_step = 24 * 60 / self.steps_per_day |
| hour = int((step_idx * minutes_per_step) // 60) % 24 |
| |
| if 14 <= hour < 19: |
| return 0.2112 |
| |
| else: |
| return 0.0434 |
|
|
|
|
| def _get_colorado_price(self, step_idx): |
| |
| minutes_per_step = 24 * 60 / self.steps_per_day |
| hour = int((step_idx * minutes_per_step) // 60) % 24 |
| |
| if 15 <= hour < 19: |
| return 0.32 |
| |
| elif 13 <= hour < 15: |
| return 0.22 |
| |
| else: |
| return 0.12 |
|
|
|
|
| def _get_pennsylvania_price(self, step_idx): |
| |
| minutes_per_step = 24 * 60 / self.steps_per_day |
| hour = int((step_idx * minutes_per_step) // 60) % 24 |
| |
| if 13 <= hour < 21: |
| return 0.125048 |
| |
| elif hour >= 23 or hour < 6: |
| return 0.057014 |
| |
| else: |
| return 0.079085 |
|
|
|
|
| def get_peer_price(self, step_idx, total_surplus, total_shortfall): |
| grid_price = self.get_grid_price(step_idx) |
| feed_in_tariff = self.feed_in_tariff |
|
|
| base_price = grid_price * 0.90 |
| net_demand = total_shortfall - total_surplus |
| total_potential_trade = total_shortfall + total_surplus + 1e-6 |
| elasticity_factor = 0.3 |
| price_multiplier = np.exp(elasticity_factor * (net_demand / total_potential_trade)) |
| peer_price = base_price * price_multiplier |
| final_price = float(np.clip(peer_price, feed_in_tariff, grid_price)) |
|
|
| return final_price |
|
|
|
|
| def reset(self): |
| |
| if self.current_step > 0: |
| positive_savings = self.agent_cost_savings[self.agent_cost_savings > 0] |
| if len(positive_savings) > 1: |
| fairness_on_savings = self._compute_jains_index(positive_savings) |
| else: |
| fairness_on_savings = 0.0 |
|
|
| |
| self.episode_metrics = { |
| "grid_reduction_entire_day": self.cumulative_grid_reduction, |
| "grid_reduction_peak_hours": self.cumulative_grid_reduction_peak, |
| "total_cost_savings": np.sum(self.agent_cost_savings), |
| "fairness_on_cost_savings": fairness_on_savings, |
| "battery_degradation_cost_total": self.cumulative_degradation_cost, |
| "degradation_cost_over_time": self.degradation_cost_timeseries, |
| "cost_savings_over_time": self.cost_savings_timeseries, |
| "grid_reduction_over_time": self.grid_reduction_timeseries, |
| } |
| |
| self.day_index = np.random.randint(0, self.total_days) |
|
|
| start_row = self.day_index * self.steps_per_day |
| end_row = start_row + self.steps_per_day |
| day_data = self.all_data.iloc[start_row:end_row].copy() |
| self.data = day_data |
|
|
| self.no_p2p_import_day = {} |
| for hid in self.house_ids: |
| self.no_p2p_import_day[hid] = self.original_no_p2p_import[hid][start_row:end_row] |
|
|
| self.demands = {} |
| self.solars = {} |
|
|
| for hid in self.house_ids: |
| col_grid = f"grid_{hid}" |
| col_solar = f"total_solar_{hid}" |
|
|
| grid_series = day_data[col_grid].fillna(0.0) |
| solar_series = day_data[col_solar].fillna(0.0).clip(lower=0.0) |
|
|
| demand_array = grid_series.values + solar_series.values |
| demand_array = np.clip(demand_array, 0.0, None) |
|
|
| self.demands[hid] = demand_array |
| self.solars[hid] = solar_series.values |
|
|
| self.current_step = 0 |
| self.env_log = [] |
| |
| |
| for hid in self.house_ids: |
| self.previous_actions[hid] = np.zeros(6) |
| |
| self._initialize_episode_metrics() |
|
|
| |
| for hid, batt in self.batteries.items(): |
| low = 0.30 * batt["max_capacity"] |
| high = 0.70 * batt["max_capacity"] |
| batt["soc"] = random.uniform(low, high) |
|
|
| obs = self._get_obs() |
| obs_list = [obs[i] for i in range(self.num_agents)] |
| return obs_list |
|
|
|
|
| def step(self, actions): |
| |
| actions = np.array(actions, dtype=np.float32) |
| if actions.shape != (self.num_agents, 6): |
| raise ValueError(f"Actions shape mismatch: got {actions.shape}, expected {(self.num_agents, 6)}") |
| actions = np.clip(actions, 0.0, 1.0) |
|
|
| a_sellGrid = actions[:, 0] |
| a_buyGrid = actions[:, 1] |
| a_sellPeers = actions[:, 2] |
| a_buyPeers = actions[:, 3] |
| a_chargeBatt = actions[:, 4] |
| a_dischargeBatt = actions[:, 5] |
| |
| |
| demands = [] |
| solars = [] |
| for i, hid in enumerate(self.house_ids): |
| demands.append(self.demands[hid][self.current_step]) |
| solars.append(self.solars[hid][self.current_step]) |
|
|
| demands = np.array(demands, dtype=np.float32) |
| solars = np.array(solars, dtype=np.float32) |
|
|
| |
| total_surplus = np.maximum(solars - demands, 0.0).sum() |
| total_shortfall = np.maximum(demands - solars, 0.0).sum() |
| peer_price = self.get_peer_price(self.current_step, total_surplus, total_shortfall) |
| grid_price = self.get_grid_price(self.current_step) |
|
|
| |
| shortfall = np.maximum(demands - solars, 0.0) |
| surplus = np.maximum(solars - demands, 0.0) |
| |
| final_shortfall = shortfall.copy() |
| final_surplus = surplus.copy() |
| grid_import = np.zeros(self.num_agents, dtype=np.float32) |
| grid_export = np.zeros(self.num_agents, dtype=np.float32) |
|
|
| |
| discharge_amount = np.zeros(self.num_agents, dtype=np.float32) |
| for i, hid in enumerate(self.house_ids): |
| if hid in self.batteries: |
| batt = self.batteries[hid] |
| max_dis = batt["max_discharge_rate"] |
| available = batt["soc"] * batt["discharge_efficiency"] |
| desired = a_dischargeBatt[i] * max_dis |
| actual = min(desired, available, final_shortfall[i]) |
| batt["soc"] -= actual / batt["discharge_efficiency"] |
| final_shortfall[i] -= actual |
| discharge_amount[i] = actual |
|
|
| |
| charge_amount = np.zeros(self.num_agents, dtype=np.float32) |
| for i, hid in enumerate(self.house_ids): |
| if hid in self.batteries: |
| batt = self.batteries[hid] |
| max_ch = batt["max_charge_rate"] |
| cap_left = batt["max_capacity"] - batt["soc"] |
| desired = a_chargeBatt[i] * max_ch |
| actual = min(desired, cap_left / batt["charge_efficiency"], final_surplus[i]) |
| batt["soc"] += actual * batt["charge_efficiency"] |
| final_surplus[i] -= actual |
| charge_amount[i] = actual |
| |
| |
| battery_offer = np.zeros(self.num_agents, dtype=np.float32) |
| for i, hid in enumerate(self.house_ids): |
| if hid in self.batteries: |
| battery_offer[i] = self.batteries[hid]["soc"] * self.batteries[hid]["discharge_efficiency"] |
| effective_surplus = final_surplus + battery_offer |
| |
| netPeer = a_buyPeers - a_sellPeers |
| p2p_buy_request = np.zeros(self.num_agents, dtype=np.float32) |
| p2p_sell_offer = np.zeros(self.num_agents, dtype=np.float32) |
| for i in range(self.num_agents): |
| if netPeer[i] > 0: |
| p2p_buy_request[i] = netPeer[i] * final_shortfall[i] |
| elif netPeer[i] < 0: |
| p2p_sell_offer[i] = -netPeer[i] * effective_surplus[i] |
| |
| total_sell = np.sum(p2p_sell_offer) |
| total_buy = np.sum(p2p_buy_request) |
| matched = min(total_sell, total_buy) |
| |
| if matched > 1e-9: |
| sell_fraction = p2p_sell_offer / (total_sell + 1e-12) |
| buy_fraction = p2p_buy_request / (total_buy + 1e-12) |
| actual_sold = matched * sell_fraction |
| actual_bought = matched * buy_fraction |
| else: |
| actual_sold = np.zeros(self.num_agents, dtype=np.float32) |
| actual_bought = np.zeros(self.num_agents, dtype=np.float32) |
| |
| from_batt_p2p = np.minimum(actual_sold, battery_offer) |
| from_solar_p2p = actual_sold - from_batt_p2p |
|
|
| |
| final_surplus -= from_solar_p2p |
| final_shortfall -= actual_bought |
|
|
| |
| for i, hid in enumerate(self.house_ids): |
| if hid in self.batteries: |
| from_batt = min(actual_sold[i], battery_offer[i]) |
| self.batteries[hid]["soc"] -= from_batt / self.batteries[hid]["discharge_efficiency"] |
| self.batteries[hid]["soc"] = max(0.0, self.batteries[hid]["soc"]) |
| |
| |
| netGrid = a_buyGrid - a_sellGrid |
| for i in range(self.num_agents): |
| if netGrid[i] > 0: |
| grid_import[i] = netGrid[i] * final_shortfall[i] |
| elif netGrid[i] < 0: |
| grid_export[i] = -netGrid[i] * final_surplus[i] |
| forced = np.maximum(final_shortfall - grid_import, 0.0) |
| grid_import += forced |
| |
| |
| costs = (grid_import * grid_price) - (grid_export * self.feed_in_tariff) + \ |
| (actual_bought * peer_price) - (actual_sold * peer_price) |
| |
| |
| final_rewards = self._compute_rewards( |
| grid_import=grid_import, grid_export=grid_export, |
| actual_sold=actual_sold, actual_bought=actual_bought, |
| charge_amount=charge_amount, discharge_amount=discharge_amount, |
| costs=costs, grid_price=grid_price, peer_price=peer_price |
| ) |
|
|
| |
| no_p2p_import_this_step = np.array([ |
| self.no_p2p_import_day[hid][self.current_step] for hid in self.house_ids |
| ], dtype=np.float32) |
|
|
| |
| step_grid_reduction = np.sum(no_p2p_import_this_step - grid_import) |
| self.cumulative_grid_reduction += step_grid_reduction |
| self.grid_reduction_timeseries.append(step_grid_reduction) |
|
|
| |
| if grid_price >= self.max_grid_price * 0.99: |
| self.cumulative_grid_reduction_peak += step_grid_reduction |
|
|
| |
| cost_no_p2p = no_p2p_import_this_step * grid_price |
| step_cost_savings_per_agent = cost_no_p2p - costs |
| self.agent_cost_savings += step_cost_savings_per_agent |
| self.cost_savings_timeseries.append(np.sum(step_cost_savings_per_agent)) |
|
|
| |
| step_degradation_cost = 0.0 |
| for i, hid in enumerate(self.house_ids): |
| if hid in self.batteries: |
| batt = self.batteries[hid] |
| degradation_cost_agent = (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"] |
| step_degradation_cost += degradation_cost_agent |
| |
| self.cumulative_degradation_cost += step_degradation_cost |
| self.degradation_cost_timeseries.append(step_degradation_cost) |
|
|
| info = { |
| "p2p_buy": actual_bought, |
| "p2p_sell": actual_sold, |
| "grid_import_with_p2p": grid_import, |
| "grid_import_no_p2p": no_p2p_import_this_step, |
| "grid_export": grid_export, |
| "costs": costs, |
| "charge_amount": charge_amount, |
| "discharge_amount": discharge_amount, |
| "step": self.current_step, |
| "step_grid_reduction": step_grid_reduction, |
| "step_cost_savings": np.sum(step_cost_savings_per_agent), |
| "step_degradation_cost": step_degradation_cost, |
| } |
|
|
| |
| self.current_step += 1 |
| done = (self.current_step >= self.num_steps) |
|
|
| |
| obs_next = self._get_obs() |
| obs_next_list = [obs_next[i] for i in range(self.num_agents)] |
| rewards_list = [final_rewards[i] for i in range(self.num_agents)] |
|
|
| return obs_next_list, rewards_list, done, info |
| |
|
|
| def _get_obs(self): |
| |
| step = min(self.current_step, self.num_steps - 1) |
|
|
| |
| demands = np.array([self.demands[hid][step] for hid in self.house_ids], dtype=np.float32) |
| solars = np.array([self.solars[hid][step] for hid in self.house_ids], dtype=np.float32) |
|
|
| |
| surplus = np.maximum(solars - demands, 0.0) |
| shortfall = np.maximum(demands - solars, 0.0) |
| total_surplus = float(surplus.sum()) |
| total_shortfall = float(shortfall.sum()) |
|
|
| grid_price = self.get_grid_price(step) |
| peer_price = self.get_peer_price(step, total_surplus, total_shortfall) |
|
|
| |
| ts = self.data.index[step] |
| hour = ts.hour + ts.minute / 60.0 |
|
|
| |
| obs = [] |
| for i, hid in enumerate(self.house_ids): |
| own_demand = demands[i] |
| own_solar = solars[i] |
|
|
| |
| if hid in self.batteries: |
| soc_frac = self.batteries[hid]["soc"] / self.batteries[hid]["max_capacity"] |
| else: |
| soc_frac = -1.0 |
|
|
| obs.append([ |
| own_demand, |
| own_solar, |
| soc_frac, |
| grid_price, |
| peer_price, |
| float(demands.sum() - own_demand), |
| float(solars.sum() - own_solar), |
| hour |
| ]) |
|
|
| return np.array(obs, dtype=np.float32) |
|
|
|
|
| def _compute_jains_index(self, usage_array): |
| """Simple Jain's Fairness Index.""" |
| x = np.array(usage_array, dtype=np.float32) |
| numerator = (np.sum(x))**2 |
| denominator = len(x) * np.sum(x**2) + 1e-8 |
| return numerator / denominator |
|
|
|
|
| def _compute_rewards( |
| self, |
| grid_import, |
| grid_export, |
| actual_sold, |
| actual_bought, |
| charge_amount, |
| discharge_amount, |
| costs, |
| grid_price, |
| peer_price |
| ): |
| |
| w1 = 0.3; w2 = 0.5; w3 = 0.5; w4 = 0.1; w5 = 0.05; w6 = 0.4; w7 = 1.0 |
|
|
| |
| jfi = self._compute_jains_index(actual_bought + actual_sold) |
|
|
| |
| p_grid_norm = grid_price / self.max_grid_price |
| p_peer_norm = peer_price / self.max_grid_price |
|
|
| rewards = np.zeros(self.num_agents, dtype=np.float32) |
| for i, hid in enumerate(self.house_ids): |
| |
| reward = - costs[i] * w7 |
|
|
| |
| grid_penalty = w1 * grid_import[i] * p_grid_norm |
|
|
| |
| p2p_sell_bonus = w2 * actual_sold[i] * p_peer_norm |
| if peer_price < grid_price: |
| p2p_buy_bonus = w3 * actual_bought[i] * ((grid_price - peer_price) / self.max_grid_price) |
| else: |
| p2p_buy_bonus = 0.0 |
|
|
| |
| if hid in self.batteries: |
| batt = self.batteries[hid] |
| soc_frac = batt["soc"] / batt["max_capacity"] |
| soc_penalty = w4 * (soc_frac - 0.5) ** 2 |
| degradation_penalty = w5 * (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"] |
| else: |
| soc_penalty = degradation_penalty = 0.0 |
|
|
| |
| fairness_bonus = w6 * jfi |
|
|
| |
| reward += ( |
| - grid_penalty |
| + p2p_sell_bonus |
| + p2p_buy_bonus |
| - soc_penalty |
| - degradation_penalty |
| + fairness_bonus |
| ) |
| rewards[i] = reward |
|
|
| return rewards |
|
|
|
|
| def get_episode_metrics(self): |
| """ |
| Return performance metrics for the last completed episode. |
| Call after episode finishes (after env.reset()). |
| """ |
| return self.episode_metrics |
|
|
|
|
| def save_log(self, filename="env_log.csv"): |
| """Save environment step log to CSV.""" |
| columns = [ |
| "Step", "Total_Grid_Import", "Total_Grid_Export", |
| "Total_P2P_Buy", "Total_P2P_Sell", "Total_Cost", |
| ] |
| df = pd.DataFrame(self.env_log, columns=columns) |
| df.to_csv(filename, index=False) |
| print(f"Environment log saved to {filename}") |