import gym import pandas as pd import numpy as np from collections import deque import random random.seed(42) np.random.seed(42) class SolarSys(gym.Env): def __init__( self, data_path="/path/to/project/training/200houses_152days_TRAIN.csv", state="oklahoma", # for Oklahoma (example) time_freq="15T", # "15T", "30T", "1H", "3H", "6H" ): super().__init__() # Store config self.data_path = data_path self.time_freq = time_freq self.state = state.lower() # Centralized Pricing Configuration self._pricing_info = { "oklahoma": { "max_grid_price": 0.2112, "feed_in_tariff": 0.04, "price_function": self._get_oklahoma_price }, "colorado": { "max_grid_price": 0.32, "feed_in_tariff": 0.055, "price_function": self._get_colorado_price }, "pennsylvania": { "max_grid_price": 0.12505, "feed_in_tariff": 0.06, "price_function": self._get_pennsylvania_price } } if self.state not in self._pricing_info: raise ValueError(f"State '{self.state}' is not supported. Available states: {list(self._pricing_info.keys())}") state_config = self._pricing_info[self.state] self.max_grid_price = state_config["max_grid_price"] self.feed_in_tariff = state_config["feed_in_tariff"] self._get_price_function = state_config["price_function"] try: all_data = pd.read_csv(data_path) all_data["local_15min"] = pd.to_datetime(all_data["local_15min"], utc=True) all_data.set_index("local_15min", inplace=True) all_data = all_data.resample(time_freq).mean() except FileNotFoundError: raise FileNotFoundError(f"Data file {data_path} not found.") except pd.errors.EmptyDataError: raise ValueError(f"Data file {data_path} is empty.") except Exception as e: raise ValueError(f"Error loading data: {e}") # Compute global maxima for normalization grid_cols = [c for c in all_data.columns if c.startswith("grid_")] solar_cols = [c for c in all_data.columns if c.startswith("total_solar_")] all_grid = all_data[grid_cols].values all_solar = all_data[solar_cols].values # max total demand = max(grid + solar) over all time & agents self.global_max_demand = float((all_grid + all_solar).max()) + 1e-8 # max solar generation alone self.global_max_solar = float(all_solar.max()) + 1e-8 # Store the resampled dataset self.all_data = all_data self.time_freq = time_freq freq_offset = pd.tseries.frequencies.to_offset(time_freq) minutes_per_step = freq_offset.nanos / 1e9 / 60.0 self.steps_per_day = int(24 * 60 // minutes_per_step) total_rows = len(self.all_data) self.total_days = total_rows // self.steps_per_day if self.total_days < 1: raise ValueError( f"After resampling, dataset has {total_rows} rows, which is " f"less than a single day of {self.steps_per_day} steps." ) self.house_ids = [ col.split("_")[1] for col in self.all_data.columns if col.startswith("grid_") ] self.num_agents = len(self.house_ids) self.original_no_p2p_import = {} for hid in self.house_ids: col_grid = f"grid_{hid}" self.original_no_p2p_import[hid] = self.all_data[col_grid].clip(lower=0.0).values # Determine population groups # group 1 = has any solar; group 0 = never solar solar_cols = [f"total_solar_{hid}" for hid in self.house_ids] solar_sums = self.all_data[solar_cols].sum(axis=0).to_dict() self.agent_groups = [ 1 if solar_sums[f"total_solar_{hid}"] > 0 else 0 for hid in self.house_ids ] # Count the number of houses in each group self.group_counts = { 0: self.agent_groups.count(0), 1: self.agent_groups.count(1) } print(f"Number of houses in each group: {self.group_counts}") # Battery logic self.battery_options = { "teslapowerwall": {"max_capacity": 13.5, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 5.0, "max_discharge_rate": 5.0, "degradation_cost_per_kwh": 0.005}, "enphase": {"max_capacity": 5.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 2.0, "max_discharge_rate": 2.0, "degradation_cost_per_kwh": 0.005}, "franklin": {"max_capacity": 15.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 6.0, "max_discharge_rate": 6.0, "degradation_cost_per_kwh": 0.005}, } # Identify which houses actually have solar self.solar_houses = [ hid for hid in self.house_ids if (self.all_data[f"total_solar_{hid}"] > 0).any() ] # Assign a random battery type to each solar-equipped house self.batteries = {} for hid in self.solar_houses: choice = random.choice(list(self.battery_options)) specs = self.battery_options[choice] self.batteries[hid] = {"soc": 0.0, **specs} # Observation & Action Spaces # [own_demand, own_solar, grid_price, peer_price, # total_demand_others, total_solar_others, SOC, time_of_day] self.observation_space = gym.spaces.Box( low=-np.inf, high=np.inf, shape=(self.num_agents, 8), dtype=np.float32 ) # [sell_to_grid, buy_from_grid, sell_to_peers, buy_from_peers, charge_battery, discharge_battery] self.action_space = gym.spaces.Box( low=0.0, high=1.0, shape=(self.num_agents, 6), dtype=np.float32 ) self.episode_metrics = {} self._initialize_episode_metrics() # Initialize episode variables self.data = None self.env_log = [] self.day_index = -1 self.current_step = 0 self.num_steps = self.steps_per_day self.demands = {} self.solars = {} self.previous_actions = { hid: np.zeros(6) for hid in self.house_ids } def _initialize_episode_metrics(self): """Initialize or reset all metrics tracked over a single episode.""" self.cumulative_grid_reduction = 0.0 self.cumulative_grid_reduction_peak = 0.0 self.cumulative_degradation_cost = 0.0 self.agent_cost_savings = np.zeros(self.num_agents) self.degradation_cost_timeseries = [] self.cost_savings_timeseries = [] self.grid_reduction_timeseries = [] # Price Functions def get_grid_price(self, step_idx): """Return grid price for the current step based on selected state.""" return self._get_price_function(step_idx) def _get_oklahoma_price(self, step_idx): # Oklahoma Gas & Electric (OG&E) TOU minutes_per_step = 24 * 60 / self.steps_per_day hour = int((step_idx * minutes_per_step) // 60) % 24 # Peak: 2 pm to 7 pm if 14 <= hour < 19: return 0.2112 # Off-peak: All other times else: return 0.0434 def _get_colorado_price(self, step_idx): # Xcel Energy Colorado TOU minutes_per_step = 24 * 60 / self.steps_per_day hour = int((step_idx * minutes_per_step) // 60) % 24 # On-peak: 3 pm to 7 pm if 15 <= hour < 19: return 0.32 # Mid-peak: 1 pm to 3 pm elif 13 <= hour < 15: return 0.22 # Off-peak: Before 1 pm and after 7 pm else: return 0.12 def _get_pennsylvania_price(self, step_idx): # Duquesne Light (Pennsylvania) EV TOU minutes_per_step = 24 * 60 / self.steps_per_day hour = int((step_idx * minutes_per_step) // 60) % 24 # Peak: 1 pm to 9 pm if 13 <= hour < 21: return 0.125048 # Super Off-Peak: 11 pm to 6 am elif hour >= 23 or hour < 6: return 0.057014 # Off-Peak: 6 am to 1 pm and 9 pm to 11 pm else: return 0.079085 def get_peer_price(self, step_idx, total_surplus, total_shortfall): grid_price = self.get_grid_price(step_idx) feed_in_tariff = self.feed_in_tariff base_price = grid_price * 0.90 net_demand = total_shortfall - total_surplus total_potential_trade = total_shortfall + total_surplus + 1e-6 elasticity_factor = 0.3 price_multiplier = np.exp(elasticity_factor * (net_demand / total_potential_trade)) peer_price = base_price * price_multiplier final_price = float(np.clip(peer_price, feed_in_tariff, grid_price)) return final_price def reset(self): # Finalize and store metrics from completed episode before resetting if self.current_step > 0: positive_savings = self.agent_cost_savings[self.agent_cost_savings > 0] if len(positive_savings) > 1: fairness_on_savings = self._compute_jains_index(positive_savings) else: fairness_on_savings = 0.0 # Store all final metrics self.episode_metrics = { "grid_reduction_entire_day": self.cumulative_grid_reduction, "grid_reduction_peak_hours": self.cumulative_grid_reduction_peak, "total_cost_savings": np.sum(self.agent_cost_savings), "fairness_on_cost_savings": fairness_on_savings, "battery_degradation_cost_total": self.cumulative_degradation_cost, "degradation_cost_over_time": self.degradation_cost_timeseries, "cost_savings_over_time": self.cost_savings_timeseries, "grid_reduction_over_time": self.grid_reduction_timeseries, } self.day_index = np.random.randint(0, self.total_days) start_row = self.day_index * self.steps_per_day end_row = start_row + self.steps_per_day day_data = self.all_data.iloc[start_row:end_row].copy() self.data = day_data self.no_p2p_import_day = {} for hid in self.house_ids: self.no_p2p_import_day[hid] = self.original_no_p2p_import[hid][start_row:end_row] self.demands = {} self.solars = {} for hid in self.house_ids: col_grid = f"grid_{hid}" col_solar = f"total_solar_{hid}" grid_series = day_data[col_grid].fillna(0.0) solar_series = day_data[col_solar].fillna(0.0).clip(lower=0.0) demand_array = grid_series.values + solar_series.values demand_array = np.clip(demand_array, 0.0, None) self.demands[hid] = demand_array self.solars[hid] = solar_series.values self.current_step = 0 self.env_log = [] # Reset previous_actions to 6 zeros for hid in self.house_ids: self.previous_actions[hid] = np.zeros(6) self._initialize_episode_metrics() # Randomize battery SOC between 30%–70% of capacity for hid, batt in self.batteries.items(): low = 0.30 * batt["max_capacity"] high = 0.70 * batt["max_capacity"] batt["soc"] = random.uniform(low, high) obs = self._get_obs() obs_list = [obs[i] for i in range(self.num_agents)] return obs_list def step(self, actions): # Validate & clamp actions actions = np.array(actions, dtype=np.float32) if actions.shape != (self.num_agents, 6): raise ValueError(f"Actions shape mismatch: got {actions.shape}, expected {(self.num_agents, 6)}") actions = np.clip(actions, 0.0, 1.0) a_sellGrid = actions[:, 0] a_buyGrid = actions[:, 1] a_sellPeers = actions[:, 2] a_buyPeers = actions[:, 3] a_chargeBatt = actions[:, 4] a_dischargeBatt = actions[:, 5] # Gather current demand & solar demands = [] solars = [] for i, hid in enumerate(self.house_ids): demands.append(self.demands[hid][self.current_step]) solars.append(self.solars[hid][self.current_step]) demands = np.array(demands, dtype=np.float32) solars = np.array(solars, dtype=np.float32) # Calculations for peer_price and grid_price total_surplus = np.maximum(solars - demands, 0.0).sum() total_shortfall = np.maximum(demands - solars, 0.0).sum() peer_price = self.get_peer_price(self.current_step, total_surplus, total_shortfall) grid_price = self.get_grid_price(self.current_step) # Enforce "self-use first" shortfall = np.maximum(demands - solars, 0.0) surplus = np.maximum(solars - demands, 0.0) final_shortfall = shortfall.copy() final_surplus = surplus.copy() grid_import = np.zeros(self.num_agents, dtype=np.float32) grid_export = np.zeros(self.num_agents, dtype=np.float32) # Battery discharge discharge_amount = np.zeros(self.num_agents, dtype=np.float32) for i, hid in enumerate(self.house_ids): if hid in self.batteries: batt = self.batteries[hid] max_dis = batt["max_discharge_rate"] available = batt["soc"] * batt["discharge_efficiency"] desired = a_dischargeBatt[i] * max_dis actual = min(desired, available, final_shortfall[i]) batt["soc"] -= actual / batt["discharge_efficiency"] final_shortfall[i] -= actual discharge_amount[i] = actual # Battery charge charge_amount = np.zeros(self.num_agents, dtype=np.float32) for i, hid in enumerate(self.house_ids): if hid in self.batteries: batt = self.batteries[hid] max_ch = batt["max_charge_rate"] cap_left = batt["max_capacity"] - batt["soc"] desired = a_chargeBatt[i] * max_ch actual = min(desired, cap_left / batt["charge_efficiency"], final_surplus[i]) batt["soc"] += actual * batt["charge_efficiency"] final_surplus[i] -= actual charge_amount[i] = actual # P2P matching battery_offer = np.zeros(self.num_agents, dtype=np.float32) for i, hid in enumerate(self.house_ids): if hid in self.batteries: battery_offer[i] = self.batteries[hid]["soc"] * self.batteries[hid]["discharge_efficiency"] effective_surplus = final_surplus + battery_offer netPeer = a_buyPeers - a_sellPeers p2p_buy_request = np.zeros(self.num_agents, dtype=np.float32) p2p_sell_offer = np.zeros(self.num_agents, dtype=np.float32) for i in range(self.num_agents): if netPeer[i] > 0: p2p_buy_request[i] = netPeer[i] * final_shortfall[i] elif netPeer[i] < 0: p2p_sell_offer[i] = -netPeer[i] * effective_surplus[i] total_sell = np.sum(p2p_sell_offer) total_buy = np.sum(p2p_buy_request) matched = min(total_sell, total_buy) if matched > 1e-9: sell_fraction = p2p_sell_offer / (total_sell + 1e-12) buy_fraction = p2p_buy_request / (total_buy + 1e-12) actual_sold = matched * sell_fraction actual_bought = matched * buy_fraction else: actual_sold = np.zeros(self.num_agents, dtype=np.float32) actual_bought = np.zeros(self.num_agents, dtype=np.float32) from_batt_p2p = np.minimum(actual_sold, battery_offer) from_solar_p2p = actual_sold - from_batt_p2p # Update balances final_surplus -= from_solar_p2p final_shortfall -= actual_bought # Deduct peer battery sales from SOC for i, hid in enumerate(self.house_ids): if hid in self.batteries: from_batt = min(actual_sold[i], battery_offer[i]) self.batteries[hid]["soc"] -= from_batt / self.batteries[hid]["discharge_efficiency"] self.batteries[hid]["soc"] = max(0.0, self.batteries[hid]["soc"]) # Grid trades netGrid = a_buyGrid - a_sellGrid for i in range(self.num_agents): if netGrid[i] > 0: grid_import[i] = netGrid[i] * final_shortfall[i] elif netGrid[i] < 0: grid_export[i] = -netGrid[i] * final_surplus[i] forced = np.maximum(final_shortfall - grid_import, 0.0) grid_import += forced # Calculate costs costs = (grid_import * grid_price) - (grid_export * self.feed_in_tariff) + \ (actual_bought * peer_price) - (actual_sold * peer_price) # Calculate rewards final_rewards = self._compute_rewards( grid_import=grid_import, grid_export=grid_export, actual_sold=actual_sold, actual_bought=actual_bought, charge_amount=charge_amount, discharge_amount=discharge_amount, costs=costs, grid_price=grid_price, peer_price=peer_price ) # Metric calculations for the current step no_p2p_import_this_step = np.array([ self.no_p2p_import_day[hid][self.current_step] for hid in self.house_ids ], dtype=np.float32) # Grid Reduction metrics step_grid_reduction = np.sum(no_p2p_import_this_step - grid_import) self.cumulative_grid_reduction += step_grid_reduction self.grid_reduction_timeseries.append(step_grid_reduction) # Check if current grid price corresponds to peak hour if grid_price >= self.max_grid_price * 0.99: self.cumulative_grid_reduction_peak += step_grid_reduction # Cost Savings cost_no_p2p = no_p2p_import_this_step * grid_price step_cost_savings_per_agent = cost_no_p2p - costs self.agent_cost_savings += step_cost_savings_per_agent self.cost_savings_timeseries.append(np.sum(step_cost_savings_per_agent)) # Battery Degradation Cost step_degradation_cost = 0.0 for i, hid in enumerate(self.house_ids): if hid in self.batteries: batt = self.batteries[hid] degradation_cost_agent = (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"] step_degradation_cost += degradation_cost_agent self.cumulative_degradation_cost += step_degradation_cost self.degradation_cost_timeseries.append(step_degradation_cost) info = { "p2p_buy": actual_bought, "p2p_sell": actual_sold, "grid_import_with_p2p": grid_import, "grid_import_no_p2p": no_p2p_import_this_step, "grid_export": grid_export, "costs": costs, "charge_amount": charge_amount, "discharge_amount": discharge_amount, "step": self.current_step, "step_grid_reduction": step_grid_reduction, "step_cost_savings": np.sum(step_cost_savings_per_agent), "step_degradation_cost": step_degradation_cost, } # Increment step & decide "done" self.current_step += 1 done = (self.current_step >= self.num_steps) # Return next obs, reward list, done, info obs_next = self._get_obs() obs_next_list = [obs_next[i] for i in range(self.num_agents)] rewards_list = [final_rewards[i] for i in range(self.num_agents)] return obs_next_list, rewards_list, done, info def _get_obs(self): # Build observation array for each agent, including dynamic peer pricing step = min(self.current_step, self.num_steps - 1) # Gather per-agent demand/solar into arrays demands = np.array([self.demands[hid][step] for hid in self.house_ids], dtype=np.float32) solars = np.array([self.solars[hid][step] for hid in self.house_ids], dtype=np.float32) # Compute market aggregates for dynamic pricing surplus = np.maximum(solars - demands, 0.0) shortfall = np.maximum(demands - solars, 0.0) total_surplus = float(surplus.sum()) total_shortfall = float(shortfall.sum()) grid_price = self.get_grid_price(step) peer_price = self.get_peer_price(step, total_surplus, total_shortfall) # Compute time-of-day feature ts = self.data.index[step] hour = ts.hour + ts.minute / 60.0 # Build per-agent obs obs = [] for i, hid in enumerate(self.house_ids): own_demand = demands[i] own_solar = solars[i] # Compute state-of-charge fraction (0–1), -1 for non-battery agents if hid in self.batteries: soc_frac = self.batteries[hid]["soc"] / self.batteries[hid]["max_capacity"] else: soc_frac = -1.0 obs.append([ own_demand, own_solar, soc_frac, grid_price, peer_price, float(demands.sum() - own_demand), float(solars.sum() - own_solar), hour ]) return np.array(obs, dtype=np.float32) def _compute_jains_index(self, usage_array): """Simple Jain's Fairness Index.""" x = np.array(usage_array, dtype=np.float32) numerator = (np.sum(x))**2 denominator = len(x) * np.sum(x**2) + 1e-8 return numerator / denominator def _compute_rewards( self, grid_import, grid_export, actual_sold, actual_bought, charge_amount, discharge_amount, costs, grid_price, peer_price ): # Weights for each component w1 = 0.3; w2 = 0.5; w3 = 0.5; w4 = 0.1; w5 = 0.05; w6 = 0.4; w7 = 1.0 # Jain's index on total P2P volume jfi = self._compute_jains_index(actual_bought + actual_sold) # Normalize prices p_grid_norm = grid_price / self.max_grid_price p_peer_norm = peer_price / self.max_grid_price rewards = np.zeros(self.num_agents, dtype=np.float32) for i, hid in enumerate(self.house_ids): # Base reward is negative cost reward = - costs[i] * w7 # Grid import penalty grid_penalty = w1 * grid_import[i] * p_grid_norm # P2P sell & buy bonuses p2p_sell_bonus = w2 * actual_sold[i] * p_peer_norm if peer_price < grid_price: p2p_buy_bonus = w3 * actual_bought[i] * ((grid_price - peer_price) / self.max_grid_price) else: p2p_buy_bonus = 0.0 # Battery penalties (only solar houses have entries) if hid in self.batteries: batt = self.batteries[hid] soc_frac = batt["soc"] / batt["max_capacity"] soc_penalty = w4 * (soc_frac - 0.5) ** 2 degradation_penalty = w5 * (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"] else: soc_penalty = degradation_penalty = 0.0 # Fairness fairness_bonus = w6 * jfi # Combine reward += ( - grid_penalty + p2p_sell_bonus + p2p_buy_bonus - soc_penalty - degradation_penalty + fairness_bonus ) rewards[i] = reward return rewards def get_episode_metrics(self): """ Return performance metrics for the last completed episode. Call after episode finishes (after env.reset()). """ return self.episode_metrics def save_log(self, filename="env_log.csv"): """Save environment step log to CSV.""" columns = [ "Step", "Total_Grid_Import", "Total_Grid_Export", "Total_P2P_Buy", "Total_P2P_Sell", "Total_Cost", ] df = pd.DataFrame(self.env_log, columns=columns) df.to_csv(filename, index=False) print(f"Environment log saved to {filename}")