SolarSys / Other_algorithms /Flat_System /solar_sys_environment.py
SolarSys2025's picture
Upload 30 files
55da406 verified
import gym
import pandas as pd
import numpy as np
import random
from gym.spaces import Box
random.seed(42)
np.random.seed(42)
class SolarSys(gym.Env):
"""
Flat (non-hierarchical) OpenAI Gym Environment for Multi-Agent energy management
in a residential cluster, featuring complex P2P pricing and reward structures
similar to the low-level agents in the Hierarchical model.
"""
def __init__(
self,
data_path: str = "./data/training/simulated_data.csv",
state: str = "region_a", # Generalized: region_a, region_b, region_c
time_freq: str = "15T",
):
super().__init__()
self.data_path = data_path
self.time_freq = time_freq
self.state = state.lower()
# --- Generalized Pricing Configuration ---
self._pricing_info = {
"region_a": {
"max_grid_price": 0.2112,
"feed_in_tariff": 0.04,
"price_function": self._get_region_a_price
},
"region_b": {
"max_grid_price": 0.32,
"feed_in_tariff": 0.055,
"price_function": self._get_region_b_price
},
"region_c": {
"max_grid_price": 0.12505,
"feed_in_tariff": 0.06,
"price_function": self._get_region_c_price
}
}
if self.state not in self._pricing_info:
raise ValueError(f"State '{self.state}' is not supported. Available states: {list(self._pricing_info.keys())}")
state_config = self._pricing_info[self.state]
self.max_grid_price = state_config["max_grid_price"]
self.feed_in_tariff = state_config["feed_in_tariff"]
self._get_price_function = state_config["price_function"]
# --- Data Loading ---
try:
all_data = pd.read_csv(data_path)
all_data["local_15min"] = pd.to_datetime(all_data["local_15min"], utc=True)
all_data.set_index("local_15min", inplace=True)
all_data = all_data.resample(time_freq).mean()
except FileNotFoundError:
raise FileNotFoundError(f"Data file {data_path} not found.")
except pd.errors.EmptyDataError:
raise ValueError(f"Data file {data_path} is empty.")
except Exception as e:
raise ValueError(f"Error loading data: {e}")
# Compute global maxima for normalization
grid_cols = [c for c in all_data.columns if c.startswith("grid_")]
solar_cols = [c for c in all_data.columns if c.startswith("total_solar_")]
all_grid = all_data[grid_cols].values
all_solar = all_data[solar_cols].values
self.global_max_demand = float((all_grid + all_solar).max()) + 1e-8
self.global_max_solar = float(all_solar.max()) + 1e-8
self.all_data = all_data
# Calculate time steps
freq_offset = pd.tseries.frequencies.to_offset(time_freq)
minutes_per_step = freq_offset.nanos / 1e9 / 60.0
self.steps_per_day = int(24 * 60 // minutes_per_step)
total_rows = len(self.all_data)
self.total_days = total_rows // self.steps_per_day
if self.total_days < 1:
raise ValueError("Dataset has less than a single day of data.")
self.house_ids = [
col.split("_")[1] for col in self.all_data.columns
if col.startswith("grid_")
]
self.num_agents = len(self.house_ids)
self.original_no_p2p_import = {}
for hid in self.house_ids:
col_grid = f"grid_{hid}"
self.original_no_p2p_import[hid] = self.all_data[col_grid].clip(lower=0.0).values
# Determine population groups and battery assignments
solar_sums = self.all_data[solar_cols].sum(axis=0).to_dict()
self.agent_groups = [
1 if solar_sums[f"total_solar_{hid}"] > 0 else 0 for hid in self.house_ids
]
self.solar_houses = [
hid for hid in self.house_ids if self.agent_groups[self.house_ids.index(hid)] == 1
]
self.battery_options = {
"teslapowerwall": {"max_capacity": 13.5, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 5.0, "max_discharge_rate": 5.0, "degradation_cost_per_kwh": 0.005},
"enphase": {"max_capacity": 5.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 2.0, "max_discharge_rate": 2.0, "degradation_cost_per_kwh": 0.005},
"franklin": {"max_capacity": 15.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 6.0, "max_discharge_rate": 6.0, "degradation_cost_per_kwh": 0.005},
}
# Initialize battery specs as vectorized arrays (Crucial for speed)
self.batteries = {}
self.has_battery = np.zeros(self.num_agents, dtype=np.float32)
self.battery_max_capacity = np.zeros(self.num_agents, dtype=np.float32)
self.battery_charge_efficiency = np.zeros(self.num_agents, dtype=np.float32)
self.battery_discharge_efficiency = np.zeros(self.num_agents, dtype=np.float32)
self.battery_max_charge_rate = np.zeros(self.num_agents, dtype=np.float32)
self.battery_max_discharge_rate = np.zeros(self.num_agents, dtype=np.float32)
self.battery_degradation_cost = np.zeros(self.num_agents, dtype=np.float32)
self.battery_soc = np.zeros(self.num_agents, dtype=np.float32)
for i, hid in enumerate(self.house_ids):
if hid in self.solar_houses:
choice = random.choice(list(self.battery_options))
specs = self.battery_options[choice]
self.batteries[hid] = specs
self.has_battery[i] = 1.0
self.battery_max_capacity[i] = specs["max_capacity"]
self.battery_charge_efficiency[i] = specs["charge_efficiency"]
self.battery_discharge_efficiency[i] = specs["discharge_efficiency"]
self.battery_max_charge_rate[i] = specs["max_charge_rate"]
self.battery_max_discharge_rate[i] = specs["max_discharge_rate"]
self.battery_degradation_cost[i] = specs["degradation_cost_per_kwh"]
# Observation & Action Spaces
# [demand, solar, SOC_frac, grid_price, peer_price, total_demand_others, total_solar_others, hour]
self.observation_space = Box(
low=-np.inf, high=np.inf,
shape=(self.num_agents, 8),
dtype=np.float32
)
# Action: [sell_grid, buy_grid, sell_peers, buy_peers, charge_batt, discharge_batt]
self.action_space = Box(
low=0.0,
high=1.0,
shape=(self.num_agents, 6),
dtype=np.float32
)
self.episode_metrics = {}
self._initialize_episode_metrics()
# Initialize episode variables
self.data = None
self.demands_day = None
self.solars_day = None
self.hours_day = None
self.current_step = 0
self.num_steps = self.steps_per_day
self.previous_actions = np.zeros((self.num_agents, 6), dtype=np.float32)
def _initialize_episode_metrics(self):
"""Initialize or reset all metrics tracked over a single episode."""
self.cumulative_grid_reduction = 0.0
self.cumulative_grid_reduction_peak = 0.0
self.cumulative_degradation_cost = 0.0
self.agent_cost_savings = np.zeros(self.num_agents, dtype=np.float32)
self.degradation_cost_timeseries = []
self.cost_savings_timeseries = []
self.grid_reduction_timeseries = []
# --- Price Functions (Generalized) ---
def get_grid_price(self, step_idx):
"""Return grid price for the current step."""
return self._get_price_function(step_idx)
def _get_region_a_price(self, step_idx):
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
if 14 <= hour < 19:
return 0.2112
else:
return 0.0434
def _get_region_b_price(self, step_idx):
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
if 15 <= hour < 19:
return 0.32
elif 13 <= hour < 15:
return 0.22
else:
return 0.12
def _get_region_c_price(self, step_idx):
minutes_per_step = 24 * 60 / self.steps_per_day
hour = int((step_idx * minutes_per_step) // 60) % 24
if 13 <= hour < 21:
return 0.125048
elif hour >= 23 or hour < 6:
return 0.057014
else:
return 0.079085
def get_peer_price(self, step_idx, total_surplus, total_shortfall):
"""
Calculates P2P price based on supply/demand ratio (Arctangent-log approach).
This matches the logic used in the Hierarchical model's coordination layer.
"""
grid_price = self.get_grid_price(step_idx)
feed_in_tariff = self.feed_in_tariff
# Parameters for arctangent-log pricing
p_balance = (grid_price * 0.80) + (feed_in_tariff * 0.20)
p_con = (grid_price - feed_in_tariff) / (1.5 * np.pi)
k = 1.5
epsilon = 1e-6
supply = total_surplus + epsilon
demand = total_shortfall + epsilon
ratio = demand / supply
log_ratio = np.log(ratio)
if log_ratio < 0:
power_term = - (np.abs(log_ratio) ** k)
else:
power_term = log_ratio ** k
price_offset = 2 * np.pi * p_con * np.arctan(power_term)
peer_price = p_balance + price_offset
final_price = float(np.clip(peer_price, feed_in_tariff, grid_price))
return final_price
def reset(self):
# 1. Store metrics from completed episode
if self.current_step > 0:
positive_savings = self.agent_cost_savings[self.agent_cost_savings > 0]
fairness_on_savings = self._compute_jains_index(positive_savings) if len(positive_savings) > 1 else 0.0
self.episode_metrics = {
"total_cost_savings": np.sum(self.agent_cost_savings),
"fairness_on_cost_savings": fairness_on_savings,
"battery_degradation_cost_total": self.cumulative_degradation_cost,
# ... other metrics ...
}
# 2. Select random day and load data
self.day_index = np.random.randint(0, self.total_days)
start_row = self.day_index * self.steps_per_day
end_row = start_row + self.steps_per_day
day_data = self.all_data.iloc[start_row:end_row].copy()
self.data = day_data
# 3. Process Demand and Solar into Vectorized Arrays
demand_list = []
solar_list = []
for hid in self.house_ids:
col_grid = f"grid_{hid}"
col_solar = f"total_solar_{hid}"
grid_series = day_data[col_grid].fillna(0.0)
solar_series = day_data[col_solar].fillna(0.0).clip(lower=0.0)
demand_array = grid_series.values + solar_series.values
demand_array = np.clip(demand_array, 0.0, None)
demand_list.append(demand_array)
solar_list.append(solar_series.values)
self.demands_day = np.stack(demand_list, axis=1).astype(np.float32)
self.solars_day = np.stack(solar_list, axis=1).astype(np.float32)
self.hours_day = (self.data.index.hour + self.data.index.minute / 60.0).values
self.no_p2p_import_day = np.stack(
[self.original_no_p2p_import[hid][start_row:end_row] for hid in self.house_ids], axis=1
)
# 4. Reset episode metrics and step counter
self.current_step = 0
self._initialize_episode_metrics()
self.previous_actions = np.zeros((self.num_agents, 6), dtype=np.float32)
# 5. Randomize battery SOC (30%–70%)
lows = 0.30 * self.battery_max_capacity
highs = 0.70 * self.battery_max_capacity
self.battery_soc = np.random.uniform(low=lows, high=highs)
self.battery_soc *= self.has_battery # Ensure non-battery homes remain zero
# 6. Return initial observation
obs = self._get_obs()
return obs, {}
def step(self, actions):
actions = np.clip(np.array(actions, dtype=np.float32), 0.0, 1.0)
a_sellGrid, a_buyGrid, a_sellPeers, a_buyPeers, a_chargeBatt, a_dischargeBatt = actions.T
demands = self.demands_day[self.current_step]
solars = self.solars_day[self.current_step]
# 1. Pricing
total_surplus = np.maximum(solars - demands, 0.0).sum()
total_shortfall = np.maximum(demands - solars, 0.0).sum()
peer_price = self.get_peer_price(self.current_step, total_surplus, total_shortfall)
grid_price = self.get_grid_price(self.current_step)
feed_in_tariff = self.feed_in_tariff
# Initial balances (self-use enforced first)
final_shortfall = np.maximum(demands - solars, 0.0)
final_surplus = np.maximum(solars - demands, 0.0)
# --- 2. VECTORIZED BATTERY DISCHARGE ---
available_from_batt = self.battery_soc * self.battery_discharge_efficiency
desired_discharge = a_dischargeBatt * self.battery_max_discharge_rate
discharge_amount = np.minimum.reduce([desired_discharge, available_from_batt, final_shortfall])
discharge_amount *= self.has_battery
# Update SOC and shortfall
self.battery_soc -= (discharge_amount / (self.battery_discharge_efficiency + 1e-9)) * self.has_battery
self.battery_soc = np.maximum(0.0, self.battery_soc)
final_shortfall -= discharge_amount
# --- 3. VECTORIZED BATTERY CHARGE ---
cap_left = self.battery_max_capacity - self.battery_soc
desired_charge = a_chargeBatt * self.battery_max_charge_rate
charge_limit = cap_left / (self.battery_charge_efficiency + 1e-9)
charge_amount = np.minimum.reduce([desired_charge, charge_limit, final_surplus])
charge_amount *= self.has_battery
# Update SOC and surplus
self.battery_soc += charge_amount * self.battery_charge_efficiency
final_surplus -= charge_amount
# --- 4. VECTORIZED P2P TRADING ---
battery_offer = (self.battery_soc * self.battery_discharge_efficiency) * self.has_battery
effective_surplus = final_surplus + battery_offer
netPeer = a_buyPeers - a_sellPeers
p2p_buy_request = np.maximum(0, netPeer) * final_shortfall
p2p_sell_offer = np.maximum(0, -netPeer) * effective_surplus
total_sell = np.sum(p2p_sell_offer)
total_buy = np.sum(p2p_buy_request)
matched = min(total_sell, total_buy)
if matched > 1e-9:
sell_fraction = p2p_sell_offer / (total_sell + 1e-12)
buy_fraction = p2p_buy_request / (total_buy + 1e-12)
actual_sold = matched * sell_fraction
actual_bought = matched * buy_fraction
else:
actual_sold = np.zeros(self.num_agents, dtype=np.float32)
actual_bought = np.zeros(self.num_agents, dtype=np.float32)
# Track energy source for sale
from_batt_p2p = np.minimum(actual_sold, battery_offer)
from_solar_p2p = actual_sold - from_batt_p2p
# Update balances
final_surplus -= from_solar_p2p
final_shortfall -= actual_bought
# Deduct peer battery sales from SOC
soc_reduction_p2p = (from_batt_p2p / (self.battery_discharge_efficiency + 1e-9)) * self.has_battery
self.battery_soc -= soc_reduction_p2p
self.battery_soc = np.maximum(0.0, self.battery_soc)
# --- 5. GRID TRADES ---
netGrid = a_buyGrid - a_sellGrid
grid_import = np.maximum(0, netGrid) * final_shortfall
grid_export = np.maximum(0, -netGrid) * final_surplus
# Any remaining shortfall must be imported (uncontrolled import)
forced_import = np.maximum(final_shortfall - grid_import, 0.0)
grid_import += forced_import
# --- 6. COSTS AND REWARDS ---
costs = (
(grid_import * grid_price)
- (grid_export * feed_in_tariff)
+ (actual_bought * peer_price)
- (actual_sold * peer_price)
)
final_rewards = self._compute_rewards(
grid_import, grid_export, actual_sold, actual_bought,
charge_amount, discharge_amount, costs, grid_price, peer_price
)
# --- 7. Metric Logging ---
no_p2p_import_this_step = self.no_p2p_import_day[self.current_step]
step_grid_reduction = np.sum(no_p2p_import_this_step - grid_import)
self.cumulative_grid_reduction += step_grid_reduction
self.grid_reduction_timeseries.append(step_grid_reduction)
if grid_price >= self.max_grid_price * 0.99:
self.cumulative_grid_reduction_peak += step_grid_reduction
cost_no_p2p = no_p2p_import_this_step * grid_price
step_cost_savings_per_agent = cost_no_p2p - costs
self.agent_cost_savings += step_cost_savings_per_agent
self.cost_savings_timeseries.append(np.sum(step_cost_savings_per_agent))
degradation_cost_agent = (charge_amount + discharge_amount) * self.battery_degradation_cost
step_degradation_cost = np.sum(degradation_cost_agent)
self.cumulative_degradation_cost += step_degradation_cost
self.degradation_cost_timeseries.append(step_degradation_cost)
info = {
"p2p_buy": actual_bought, "p2p_sell": actual_sold,
"grid_import_with_p2p": grid_import, "grid_import_no_p2p": no_p2p_import_this_step,
"grid_export": grid_export, "costs": costs,
"charge_amount": charge_amount, "discharge_amount": discharge_amount,
"step": self.current_step, "agent_rewards": final_rewards,
}
# --- 8. Finalize Step ---
self.current_step += 1
done = (self.current_step >= self.num_steps)
obs_next = self._get_obs()
# Output required format for gym multi-agent environment
rewards_list = list(final_rewards)
return obs_next, rewards_list, done, info
def _get_obs(self):
step = min(self.current_step, self.num_steps - 1)
demands = self.demands_day[step]
solars = self.solars_day[step]
# Compute market aggregates
total_surplus = float(np.maximum(solars - demands, 0.0).sum())
total_shortfall = float(np.maximum(demands - solars, 0.0).sum())
grid_price = self.get_grid_price(step)
peer_price = self.get_peer_price(step, total_surplus, total_shortfall)
hour = self.hours_day[step]
# Compute SOC fraction for all agents (-1 for non-battery agents)
soc_frac = self.battery_soc / (self.battery_max_capacity + 1e-9)
soc_frac = np.where(self.has_battery == 1, soc_frac, -1.0)
# Vectorized Observation Construction
obs = np.stack([
demands,
solars,
soc_frac,
np.full(self.num_agents, grid_price),
np.full(self.num_agents, peer_price),
demands.sum() - demands, # Total demand of others
solars.sum() - solars, # Total solar of others
np.full(self.num_agents, hour)
], axis=1).astype(np.float32)
return obs
def _compute_jains_index(self, usage_array):
"""Simple Jain's Fairness Index."""
x = np.array(usage_array, dtype=np.float32)
numerator = (np.sum(x))**2
denominator = len(x) * np.sum(x**2) + 1e-8
return numerator / denominator
def _compute_rewards(
self, grid_import, grid_export, actual_sold, actual_bought,
charge_amount, discharge_amount, costs, grid_price, peer_price
):
"""Calculates the weighted, combined reward for all agents (vectorized)."""
# Weights (must match the hierarchical model's weights)
w1 = 0.3; w2 = 0.5; w3 = 0.5; w4 = 0.1; w5 = 0.05; w6 = 0.4; w7 = 1.0
# Jain's index on total P2P volume
jfi = self._compute_jains_index(actual_bought + actual_sold)
# Normalize prices
p_grid_norm = grid_price / self.max_grid_price
p_peer_norm = peer_price / self.max_grid_price
# Base reward: Negative costs (minimize expenditure)
rewards = -costs * w7
# 1. Grid import penalty (w1)
rewards -= w1 * grid_import * p_grid_norm
# 2. P2P sell bonus (w2)
rewards += w2 * actual_sold * p_peer_norm
# 3. P2P buy bonus (w3): only if peer price is better than grid price
buy_bonus_factor = (grid_price - peer_price) / self.max_grid_price
buy_bonus = w3 * actual_bought * buy_bonus_factor
rewards += np.where(peer_price < grid_price, buy_bonus, 0.0)
# 4. SOC deviation penalty (w4): only for agents with batteries
soc_frac = self.battery_soc / (self.battery_max_capacity + 1e-9)
soc_penalties = w4 * ((soc_frac - 0.5) ** 2) * self.has_battery
rewards -= soc_penalties
# 5. Battery degradation penalty (w5)
degrad_penalties = w5 * (charge_amount + discharge_amount) * self.battery_degradation_cost
rewards -= degrad_penalties
# 6. Fairness bonus (w6): applied equally to all agents in the cluster
rewards += w6 * jfi
return rewards
def get_episode_metrics(self):
"""Return performance metrics for the last completed episode."""
return self.episode_metrics