File size: 6,987 Bytes
55da406 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import gym
import numpy as np
import math
import sys
import os
import functools
import pandas as pd
# Ensure SolarSys Environement is on the Python path
# Please ensure you follow proper directory structure for running this code
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from Environment.solar_sys_environment import SolarSys
def form_clusters(metrics: dict, size: int) -> list:
"""
Forms balanced, heterogeneous clusters by categorizing houses based on their
energy profile and distributing them evenly in a round-robin fashion.
"""
house_ids = list(metrics.keys())
if not house_ids:
return []
all_consumption = [m['consumption'] for m in metrics.values()]
all_solar = [m['solar'] for m in metrics.values()]
median_consumption = np.median(all_consumption) if all_consumption else 0
median_solar = np.median(all_solar) if all_solar else 0
#Categorize each house based on its profile relative to the median
producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption]
consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption]
prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption]
neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption]
# Create a master list ordered by category
sorted_categorized_houses = producers + consumers + prosumers + neutrals
# Add any houses that weren't categorized to ensure none are missed
categorized_set = set(sorted_categorized_houses)
uncategorized = [h for h in house_ids if h not in categorized_set]
final_house_list = sorted_categorized_houses + uncategorized
num_houses = len(house_ids)
num_clusters = math.ceil(num_houses / size)
clusters = [[] for _ in range(num_clusters)]
for i, house_id in enumerate(final_house_list):
target_cluster_idx = i % num_clusters
clusters[target_cluster_idx].append(house_id)
return clusters
class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper):
def __init__(self, env, clusters: list):
super().__init__(env)
self.clusters = clusters
# Expose the underlying SolarSys environments for inspection by the coordinator
# self.env.envs gets the list of individual envs from the SyncVectorEnv
self.cluster_envs = self.env.envs
def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None):
num_clusters = len(self.cluster_envs)
net_transfers = np.zeros(num_clusters)
if exports is not None and imports is not None:
net_transfers = imports - exports
batched_low_level_actions = actions
batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32)
batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32)
final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices)
obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple)
dones = terminateds | truncateds
done_all = dones.all()
if done_all:
final_infos = infos['final_info']
keys = final_infos[0].keys()
infos = {k: np.stack([info[k] for info in final_infos]) for k in keys}
info_agg = {
"cluster_dones": dones,
"cluster_infos": infos,
}
return obs_next, rewards, done_all, info_agg
def get_export_capacity(self, cluster_idx: int) -> float:
"""Returns the total physically exportable energy from a cluster's batteries and solar in kWh."""
cluster_env = self.cluster_envs[cluster_idx]
available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency
total_exportable = np.sum(available_from_batt) + cluster_env.current_solar
return float(total_exportable)
def get_import_capacity(self, cluster_idx: int) -> float:
"""Returns the total physically importable space in a cluster's batteries in kWh."""
cluster_env = self.cluster_envs[cluster_idx]
free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc
total_storable = np.sum(free_space)
return float(total_storable)
def send_energy(self, from_cluster_idx: int, amount: float) -> float:
"""Drains 'amount' of energy from the specified cluster (batteries first, then solar)."""
cluster_env = self.cluster_envs[from_cluster_idx]
return cluster_env.send_energy(amount)
def receive_energy(self, to_cluster_idx: int, amount: float) -> float:
"""Charges batteries in the specified cluster with 'amount' of energy."""
cluster_env = self.cluster_envs[to_cluster_idx]
return cluster_env.receive_energy(amount)
def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str):
print("--- Pre-loading shared dataset for all environments ---")
try:
shared_df = pd.read_csv(data_path)
shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True)
shared_df.set_index("local_15min", inplace=True)
# ADD THIS LINE
shared_df = shared_df.resample(time_freq).mean()
# ADD THIS LINE
except Exception as e:
raise ValueError(f"Failed to pre-load data in make_vec_env: {e}")
base_env_for_metrics = SolarSys(
data_path=data_path,
time_freq=time_freq,
preloaded_data=shared_df, # Pass the shared DataFrame here
state=state
)
# This part for calculating metrics and forming clusters
metrics = {}
for hid in base_env_for_metrics.house_ids:
total_consumption = float(
np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum()
)
total_solar = float(
base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum()
)
metrics[hid] = {'consumption': total_consumption, 'solar': total_solar}
clusters = form_clusters(metrics, cluster_size)
print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.")
# functools.partial to create environment
env_fns = []
for cluster_house_ids in clusters:
preset_env_fn = functools.partial(
SolarSys,
data_path=data_path,
time_freq=time_freq,
house_ids_in_cluster=cluster_house_ids,
preloaded_data=shared_df,
state=state
)
env_fns.append(preset_env_fn)
sync_vec_env = gym.vector.SyncVectorEnv(env_fns)
wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters)
return wrapped_vec_env |