SolarSys / Other_algorithms /Flat_System /solar_sys_environment.py
SolarSys2025's picture
Upload 30 files
3930bf5 verified
raw
history blame
24.8 kB
import gym
import pandas as pd
import numpy as np
from collections import deque
import random
random.seed(42)
np.random.seed(42)
class SolarSys(gym.Env):
def __init__(
self,
data_path="/path/to/project/training/200houses_152days_TRAIN.csv",
state="oklahoma", # for Oklahoma (example)
time_freq="15T", # "15T", "30T", "1H", "3H", "6H"
):
super().__init__()
# Store config
self.data_path = data_path
self.time_freq = time_freq
self.state = state.lower()
# Centralized Pricing Configuration
self._pricing_info = {
"oklahoma": {
"max_grid_price": 0.2112,
"feed_in_tariff": 0.04,
"price_function": self._get_oklahoma_price
},
"colorado": {
"max_grid_price": 0.32,
"feed_in_tariff": 0.055,
"price_function": self._get_colorado_price
},
"pennsylvania": {
"max_grid_price": 0.12505,
"feed_in_tariff": 0.06,
"price_function": self._get_pennsylvania_price
}
}
if self.state not in self._pricing_info:
raise ValueError(f"State '{self.state}' is not supported. Available states: {list(self._pricing_info.keys())}")
state_config = self._pricing_info[self.state]
self.max_grid_price = state_config["max_grid_price"]
self.feed_in_tariff = state_config["feed_in_tariff"]
self._get_price_function = state_config["price_function"]
try:
all_data = pd.read_csv(data_path)
all_data["local_15min"] = pd.to_datetime(all_data["local_15min"], utc=True)
all_data.set_index("local_15min", inplace=True)
all_data = all_data.resample(time_freq).mean()
except FileNotFoundError:
raise FileNotFoundError(f"Data file {data_path} not found.")
except pd.errors.EmptyDataError:
raise ValueError(f"Data file {data_path} is empty.")
except Exception as e:
raise ValueError(f"Error loading data: {e}")
# Compute global maxima for normalization
grid_cols = [c for c in all_data.columns if c.startswith("grid_")]
solar_cols = [c for c in all_data.columns if c.startswith("total_solar_")]
all_grid = all_data[grid_cols].values
all_solar = all_data[solar_cols].values
# max total demand = max(grid + solar) over all time & agents
self.global_max_demand = float((all_grid + all_solar).max()) + 1e-8
# max solar generation alone
self.global_max_solar = float(all_solar.max()) + 1e-8
# Store the resampled dataset
self.all_data = all_data
self.time_freq = time_freq
freq_offset = pd.tseries.frequencies.to_offset(time_freq)
minutes_per_step = freq_offset.nanos / 1e9 / 60.0
self.steps_per_day = int(24 * 60 // minutes_per_step)
total_rows = len(self.all_data)
self.total_days = total_rows // self.steps_per_day
if self.total_days < 1:
raise ValueError(
f"After resampling, dataset has {total_rows} rows, which is "
f"less than a single day of {self.steps_per_day} steps."
)
self.house_ids = [
col.split("_")[1] for col in self.all_data.columns
if col.startswith("grid_")
]
self.num_agents = len(self.house_ids)
self.original_no_p2p_import = {}
for hid in self.house_ids:
col_grid = f"grid_{hid}"
self.original_no_p2p_import[hid] = self.all_data[col_grid].clip(lower=0.0).values
# Determine population groups
# group 1 = has any solar; group 0 = never solar
solar_cols = [f"total_solar_{hid}" for hid in self.house_ids]
solar_sums = self.all_data[solar_cols].sum(axis=0).to_dict()
self.agent_groups = [
1 if solar_sums[f"total_solar_{hid}"] > 0 else 0
for hid in self.house_ids
]
# Count the number of houses in each group
self.group_counts = {
0: self.agent_groups.count(0),
1: self.agent_groups.count(1)
}
print(f"Number of houses in each group: {self.group_counts}")
# Battery logic
self.battery_options = {
"teslapowerwall": {"max_capacity": 13.5, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 5.0, "max_discharge_rate": 5.0, "degradation_cost_per_kwh": 0.005},
"enphase": {"max_capacity": 5.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 2.0, "max_discharge_rate": 2.0, "degradation_cost_per_kwh": 0.005},
"franklin": {"max_capacity": 15.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 6.0, "max_discharge_rate": 6.0, "degradation_cost_per_kwh": 0.005},
}
# Identify which houses actually have solar
self.solar_houses = [
hid for hid in self.house_ids
if (self.all_data[f"total_solar_{hid}"] > 0).any()
]
# Assign a random battery type to each solar-equipped house
self.batteries = {}
for hid in self.solar_houses:
choice = random.choice(list(self.battery_options))
specs = self.battery_options[choice]
self.batteries[hid] = {"soc": 0.0, **specs}
# Observation & Action Spaces
# [own_demand, own_solar, grid_price, peer_price,
# total_demand_others, total_solar_others, SOC, time_of_day]
self.observation_space = gym.spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.num_agents, 8),
dtype=np.float32
)
# [sell_to_grid, buy_from_grid, sell_to_peers, buy_from_peers, charge_battery, discharge_battery]
self.action_space = gym.spaces.Box(
low=0.0,
high=1.0,
shape=(self.num_agents, 6),
dtype=np.float32
)
self.episode_metrics = {}
self._initialize_episode_metrics()
# Initialize episode variables
self.data = None
self.env_log = []
self.day_index = -1
self.current_step = 0
self.num_steps = self.steps_per_day
self.demands = {}
self.solars = {}
self.previous_actions = {
hid: np.zeros(6) for hid in self.house_ids
}
def _initialize_episode_metrics(self):
"""Initialize or reset all metrics tracked over a single episode."""
self.cumulative_grid_reduction = 0.0
self.cumulative_grid_reduction_peak = 0.0
self.cumulative_degradation_cost = 0.0
self.agent_cost_savings = np.zeros(self.num_agents)
self.degradation_cost_timeseries = []
self.cost_savings_timeseries = []
self.grid_reduction_timeseries = []
# Price Functions
def get_grid_price(self, step_idx):
"""Return grid price for the current step based on selected state."""
return self._get_price_function(step_idx)
def _get_oklahoma_price(self, step_idx):
# Oklahoma Gas & Electric (OG&E) TOU
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
# Peak: 2 pm to 7 pm
if 14 <= hour < 19:
return 0.2112
# Off-peak: All other times
else:
return 0.0434
def _get_colorado_price(self, step_idx):
# Xcel Energy Colorado TOU
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
# On-peak: 3 pm to 7 pm
if 15 <= hour < 19:
return 0.32
# Mid-peak: 1 pm to 3 pm
elif 13 <= hour < 15:
return 0.22
# Off-peak: Before 1 pm and after 7 pm
else:
return 0.12
def _get_pennsylvania_price(self, step_idx):
# Duquesne Light (Pennsylvania) EV TOU
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
# Peak: 1 pm to 9 pm
if 13 <= hour < 21:
return 0.125048
# Super Off-Peak: 11 pm to 6 am
elif hour >= 23 or hour < 6:
return 0.057014
# Off-Peak: 6 am to 1 pm and 9 pm to 11 pm
else:
return 0.079085
def get_peer_price(self, step_idx, total_surplus, total_shortfall):
grid_price = self.get_grid_price(step_idx)
feed_in_tariff = self.feed_in_tariff
base_price = grid_price * 0.90
net_demand = total_shortfall - total_surplus
total_potential_trade = total_shortfall + total_surplus + 1e-6
elasticity_factor = 0.3
price_multiplier = np.exp(elasticity_factor * (net_demand / total_potential_trade))
peer_price = base_price * price_multiplier
final_price = float(np.clip(peer_price, feed_in_tariff, grid_price))
return final_price
def reset(self):
# Finalize and store metrics from completed episode before resetting
if self.current_step > 0:
positive_savings = self.agent_cost_savings[self.agent_cost_savings > 0]
if len(positive_savings) > 1:
fairness_on_savings = self._compute_jains_index(positive_savings)
else:
fairness_on_savings = 0.0
# Store all final metrics
self.episode_metrics = {
"grid_reduction_entire_day": self.cumulative_grid_reduction,
"grid_reduction_peak_hours": self.cumulative_grid_reduction_peak,
"total_cost_savings": np.sum(self.agent_cost_savings),
"fairness_on_cost_savings": fairness_on_savings,
"battery_degradation_cost_total": self.cumulative_degradation_cost,
"degradation_cost_over_time": self.degradation_cost_timeseries,
"cost_savings_over_time": self.cost_savings_timeseries,
"grid_reduction_over_time": self.grid_reduction_timeseries,
}
self.day_index = np.random.randint(0, self.total_days)
start_row = self.day_index * self.steps_per_day
end_row = start_row + self.steps_per_day
day_data = self.all_data.iloc[start_row:end_row].copy()
self.data = day_data
self.no_p2p_import_day = {}
for hid in self.house_ids:
self.no_p2p_import_day[hid] = self.original_no_p2p_import[hid][start_row:end_row]
self.demands = {}
self.solars = {}
for hid in self.house_ids:
col_grid = f"grid_{hid}"
col_solar = f"total_solar_{hid}"
grid_series = day_data[col_grid].fillna(0.0)
solar_series = day_data[col_solar].fillna(0.0).clip(lower=0.0)
demand_array = grid_series.values + solar_series.values
demand_array = np.clip(demand_array, 0.0, None)
self.demands[hid] = demand_array
self.solars[hid] = solar_series.values
self.current_step = 0
self.env_log = []
# Reset previous_actions to 6 zeros
for hid in self.house_ids:
self.previous_actions[hid] = np.zeros(6)
self._initialize_episode_metrics()
# Randomize battery SOC between 30%–70% of capacity
for hid, batt in self.batteries.items():
low = 0.30 * batt["max_capacity"]
high = 0.70 * batt["max_capacity"]
batt["soc"] = random.uniform(low, high)
obs = self._get_obs()
obs_list = [obs[i] for i in range(self.num_agents)]
return obs_list
def step(self, actions):
# Validate & clamp actions
actions = np.array(actions, dtype=np.float32)
if actions.shape != (self.num_agents, 6):
raise ValueError(f"Actions shape mismatch: got {actions.shape}, expected {(self.num_agents, 6)}")
actions = np.clip(actions, 0.0, 1.0)
a_sellGrid = actions[:, 0]
a_buyGrid = actions[:, 1]
a_sellPeers = actions[:, 2]
a_buyPeers = actions[:, 3]
a_chargeBatt = actions[:, 4]
a_dischargeBatt = actions[:, 5]
# Gather current demand & solar
demands = []
solars = []
for i, hid in enumerate(self.house_ids):
demands.append(self.demands[hid][self.current_step])
solars.append(self.solars[hid][self.current_step])
demands = np.array(demands, dtype=np.float32)
solars = np.array(solars, dtype=np.float32)
# Calculations for peer_price and grid_price
total_surplus = np.maximum(solars - demands, 0.0).sum()
total_shortfall = np.maximum(demands - solars, 0.0).sum()
peer_price = self.get_peer_price(self.current_step, total_surplus, total_shortfall)
grid_price = self.get_grid_price(self.current_step)
# Enforce "self-use first"
shortfall = np.maximum(demands - solars, 0.0)
surplus = np.maximum(solars - demands, 0.0)
final_shortfall = shortfall.copy()
final_surplus = surplus.copy()
grid_import = np.zeros(self.num_agents, dtype=np.float32)
grid_export = np.zeros(self.num_agents, dtype=np.float32)
# Battery discharge
discharge_amount = np.zeros(self.num_agents, dtype=np.float32)
for i, hid in enumerate(self.house_ids):
if hid in self.batteries:
batt = self.batteries[hid]
max_dis = batt["max_discharge_rate"]
available = batt["soc"] * batt["discharge_efficiency"]
desired = a_dischargeBatt[i] * max_dis
actual = min(desired, available, final_shortfall[i])
batt["soc"] -= actual / batt["discharge_efficiency"]
final_shortfall[i] -= actual
discharge_amount[i] = actual
# Battery charge
charge_amount = np.zeros(self.num_agents, dtype=np.float32)
for i, hid in enumerate(self.house_ids):
if hid in self.batteries:
batt = self.batteries[hid]
max_ch = batt["max_charge_rate"]
cap_left = batt["max_capacity"] - batt["soc"]
desired = a_chargeBatt[i] * max_ch
actual = min(desired, cap_left / batt["charge_efficiency"], final_surplus[i])
batt["soc"] += actual * batt["charge_efficiency"]
final_surplus[i] -= actual
charge_amount[i] = actual
# P2P matching
battery_offer = np.zeros(self.num_agents, dtype=np.float32)
for i, hid in enumerate(self.house_ids):
if hid in self.batteries:
battery_offer[i] = self.batteries[hid]["soc"] * self.batteries[hid]["discharge_efficiency"]
effective_surplus = final_surplus + battery_offer
netPeer = a_buyPeers - a_sellPeers
p2p_buy_request = np.zeros(self.num_agents, dtype=np.float32)
p2p_sell_offer = np.zeros(self.num_agents, dtype=np.float32)
for i in range(self.num_agents):
if netPeer[i] > 0:
p2p_buy_request[i] = netPeer[i] * final_shortfall[i]
elif netPeer[i] < 0:
p2p_sell_offer[i] = -netPeer[i] * effective_surplus[i]
total_sell = np.sum(p2p_sell_offer)
total_buy = np.sum(p2p_buy_request)
matched = min(total_sell, total_buy)
if matched > 1e-9:
sell_fraction = p2p_sell_offer / (total_sell + 1e-12)
buy_fraction = p2p_buy_request / (total_buy + 1e-12)
actual_sold = matched * sell_fraction
actual_bought = matched * buy_fraction
else:
actual_sold = np.zeros(self.num_agents, dtype=np.float32)
actual_bought = np.zeros(self.num_agents, dtype=np.float32)
from_batt_p2p = np.minimum(actual_sold, battery_offer)
from_solar_p2p = actual_sold - from_batt_p2p
# Update balances
final_surplus -= from_solar_p2p
final_shortfall -= actual_bought
# Deduct peer battery sales from SOC
for i, hid in enumerate(self.house_ids):
if hid in self.batteries:
from_batt = min(actual_sold[i], battery_offer[i])
self.batteries[hid]["soc"] -= from_batt / self.batteries[hid]["discharge_efficiency"]
self.batteries[hid]["soc"] = max(0.0, self.batteries[hid]["soc"])
# Grid trades
netGrid = a_buyGrid - a_sellGrid
for i in range(self.num_agents):
if netGrid[i] > 0:
grid_import[i] = netGrid[i] * final_shortfall[i]
elif netGrid[i] < 0:
grid_export[i] = -netGrid[i] * final_surplus[i]
forced = np.maximum(final_shortfall - grid_import, 0.0)
grid_import += forced
# Calculate costs
costs = (grid_import * grid_price) - (grid_export * self.feed_in_tariff) + \
(actual_bought * peer_price) - (actual_sold * peer_price)
# Calculate rewards
final_rewards = self._compute_rewards(
grid_import=grid_import, grid_export=grid_export,
actual_sold=actual_sold, actual_bought=actual_bought,
charge_amount=charge_amount, discharge_amount=discharge_amount,
costs=costs, grid_price=grid_price, peer_price=peer_price
)
# Metric calculations for the current step
no_p2p_import_this_step = np.array([
self.no_p2p_import_day[hid][self.current_step] for hid in self.house_ids
], dtype=np.float32)
# Grid Reduction metrics
step_grid_reduction = np.sum(no_p2p_import_this_step - grid_import)
self.cumulative_grid_reduction += step_grid_reduction
self.grid_reduction_timeseries.append(step_grid_reduction)
# Check if current grid price corresponds to peak hour
if grid_price >= self.max_grid_price * 0.99:
self.cumulative_grid_reduction_peak += step_grid_reduction
# Cost Savings
cost_no_p2p = no_p2p_import_this_step * grid_price
step_cost_savings_per_agent = cost_no_p2p - costs
self.agent_cost_savings += step_cost_savings_per_agent
self.cost_savings_timeseries.append(np.sum(step_cost_savings_per_agent))
# Battery Degradation Cost
step_degradation_cost = 0.0
for i, hid in enumerate(self.house_ids):
if hid in self.batteries:
batt = self.batteries[hid]
degradation_cost_agent = (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"]
step_degradation_cost += degradation_cost_agent
self.cumulative_degradation_cost += step_degradation_cost
self.degradation_cost_timeseries.append(step_degradation_cost)
info = {
"p2p_buy": actual_bought,
"p2p_sell": actual_sold,
"grid_import_with_p2p": grid_import,
"grid_import_no_p2p": no_p2p_import_this_step,
"grid_export": grid_export,
"costs": costs,
"charge_amount": charge_amount,
"discharge_amount": discharge_amount,
"step": self.current_step,
"step_grid_reduction": step_grid_reduction,
"step_cost_savings": np.sum(step_cost_savings_per_agent),
"step_degradation_cost": step_degradation_cost,
}
# Increment step & decide "done"
self.current_step += 1
done = (self.current_step >= self.num_steps)
# Return next obs, reward list, done, info
obs_next = self._get_obs()
obs_next_list = [obs_next[i] for i in range(self.num_agents)]
rewards_list = [final_rewards[i] for i in range(self.num_agents)]
return obs_next_list, rewards_list, done, info
def _get_obs(self):
# Build observation array for each agent, including dynamic peer pricing
step = min(self.current_step, self.num_steps - 1)
# Gather per-agent demand/solar into arrays
demands = np.array([self.demands[hid][step] for hid in self.house_ids], dtype=np.float32)
solars = np.array([self.solars[hid][step] for hid in self.house_ids], dtype=np.float32)
# Compute market aggregates for dynamic pricing
surplus = np.maximum(solars - demands, 0.0)
shortfall = np.maximum(demands - solars, 0.0)
total_surplus = float(surplus.sum())
total_shortfall = float(shortfall.sum())
grid_price = self.get_grid_price(step)
peer_price = self.get_peer_price(step, total_surplus, total_shortfall)
# Compute time-of-day feature
ts = self.data.index[step]
hour = ts.hour + ts.minute / 60.0
# Build per-agent obs
obs = []
for i, hid in enumerate(self.house_ids):
own_demand = demands[i]
own_solar = solars[i]
# Compute state-of-charge fraction (0–1), -1 for non-battery agents
if hid in self.batteries:
soc_frac = self.batteries[hid]["soc"] / self.batteries[hid]["max_capacity"]
else:
soc_frac = -1.0
obs.append([
own_demand,
own_solar,
soc_frac,
grid_price,
peer_price,
float(demands.sum() - own_demand),
float(solars.sum() - own_solar),
hour
])
return np.array(obs, dtype=np.float32)
def _compute_jains_index(self, usage_array):
"""Simple Jain's Fairness Index."""
x = np.array(usage_array, dtype=np.float32)
numerator = (np.sum(x))**2
denominator = len(x) * np.sum(x**2) + 1e-8
return numerator / denominator
def _compute_rewards(
self,
grid_import,
grid_export,
actual_sold,
actual_bought,
charge_amount,
discharge_amount,
costs,
grid_price,
peer_price
):
# Weights for each component
w1 = 0.3; w2 = 0.5; w3 = 0.5; w4 = 0.1; w5 = 0.05; w6 = 0.4; w7 = 1.0
# Jain's index on total P2P volume
jfi = self._compute_jains_index(actual_bought + actual_sold)
# Normalize prices
p_grid_norm = grid_price / self.max_grid_price
p_peer_norm = peer_price / self.max_grid_price
rewards = np.zeros(self.num_agents, dtype=np.float32)
for i, hid in enumerate(self.house_ids):
# Base reward is negative cost
reward = - costs[i] * w7
# Grid import penalty
grid_penalty = w1 * grid_import[i] * p_grid_norm
# P2P sell & buy bonuses
p2p_sell_bonus = w2 * actual_sold[i] * p_peer_norm
if peer_price < grid_price:
p2p_buy_bonus = w3 * actual_bought[i] * ((grid_price - peer_price) / self.max_grid_price)
else:
p2p_buy_bonus = 0.0
# Battery penalties (only solar houses have entries)
if hid in self.batteries:
batt = self.batteries[hid]
soc_frac = batt["soc"] / batt["max_capacity"]
soc_penalty = w4 * (soc_frac - 0.5) ** 2
degradation_penalty = w5 * (charge_amount[i] + discharge_amount[i]) * batt["degradation_cost_per_kwh"]
else:
soc_penalty = degradation_penalty = 0.0
# Fairness
fairness_bonus = w6 * jfi
# Combine
reward += (
- grid_penalty
+ p2p_sell_bonus
+ p2p_buy_bonus
- soc_penalty
- degradation_penalty
+ fairness_bonus
)
rewards[i] = reward
return rewards
def get_episode_metrics(self):
"""
Return performance metrics for the last completed episode.
Call after episode finishes (after env.reset()).
"""
return self.episode_metrics
def save_log(self, filename="env_log.csv"):
"""Save environment step log to CSV."""
columns = [
"Step", "Total_Grid_Import", "Total_Grid_Export",
"Total_P2P_Buy", "Total_P2P_Sell", "Total_Cost",
]
df = pd.DataFrame(self.env_log, columns=columns)
df.to_csv(filename, index=False)
print(f"Environment log saved to {filename}")