import gym import numpy as np import math import sys import os import functools import pandas as pd # Ensure SolarSys Environement is on the Python path # Please ensure you follow proper directory structure for running this code sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from Environment.solar_sys_environment import SolarSys def form_clusters(metrics: dict, size: int) -> list: """ Forms balanced, heterogeneous clusters by categorizing houses based on their energy profile and distributing them evenly in a round-robin fashion. """ house_ids = list(metrics.keys()) if not house_ids: return [] all_consumption = [m['consumption'] for m in metrics.values()] all_solar = [m['solar'] for m in metrics.values()] median_consumption = np.median(all_consumption) if all_consumption else 0 median_solar = np.median(all_solar) if all_solar else 0 #Categorize each house based on its profile relative to the median producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption] consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption] prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption] neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption] # Create a master list ordered by category sorted_categorized_houses = producers + consumers + prosumers + neutrals # Add any houses that weren't categorized to ensure none are missed categorized_set = set(sorted_categorized_houses) uncategorized = [h for h in house_ids if h not in categorized_set] final_house_list = sorted_categorized_houses + uncategorized num_houses = len(house_ids) num_clusters = math.ceil(num_houses / size) clusters = [[] for _ in range(num_clusters)] for i, house_id in enumerate(final_house_list): target_cluster_idx = i % num_clusters clusters[target_cluster_idx].append(house_id) return clusters class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper): def __init__(self, env, clusters: list): super().__init__(env) self.clusters = clusters # Expose the underlying SolarSys environments for inspection by the coordinator # self.env.envs gets the list of individual envs from the SyncVectorEnv self.cluster_envs = self.env.envs def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None): num_clusters = len(self.cluster_envs) net_transfers = np.zeros(num_clusters) if exports is not None and imports is not None: net_transfers = imports - exports batched_low_level_actions = actions batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32) batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32) final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices) obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple) dones = terminateds | truncateds done_all = dones.all() if done_all: final_infos = infos['final_info'] keys = final_infos[0].keys() infos = {k: np.stack([info[k] for info in final_infos]) for k in keys} info_agg = { "cluster_dones": dones, "cluster_infos": infos, } return obs_next, rewards, done_all, info_agg def get_export_capacity(self, cluster_idx: int) -> float: """Returns the total physically exportable energy from a cluster's batteries and solar in kWh.""" cluster_env = self.cluster_envs[cluster_idx] available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency total_exportable = np.sum(available_from_batt) + cluster_env.current_solar return float(total_exportable) def get_import_capacity(self, cluster_idx: int) -> float: """Returns the total physically importable space in a cluster's batteries in kWh.""" cluster_env = self.cluster_envs[cluster_idx] free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc total_storable = np.sum(free_space) return float(total_storable) def send_energy(self, from_cluster_idx: int, amount: float) -> float: """Drains 'amount' of energy from the specified cluster (batteries first, then solar).""" cluster_env = self.cluster_envs[from_cluster_idx] return cluster_env.send_energy(amount) def receive_energy(self, to_cluster_idx: int, amount: float) -> float: """Charges batteries in the specified cluster with 'amount' of energy.""" cluster_env = self.cluster_envs[to_cluster_idx] return cluster_env.receive_energy(amount) def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str): print("--- Pre-loading shared dataset for all environments ---") try: shared_df = pd.read_csv(data_path) shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True) shared_df.set_index("local_15min", inplace=True) # ADD THIS LINE shared_df = shared_df.resample(time_freq).mean() # ADD THIS LINE except Exception as e: raise ValueError(f"Failed to pre-load data in make_vec_env: {e}") base_env_for_metrics = SolarSys( data_path=data_path, time_freq=time_freq, preloaded_data=shared_df, # Pass the shared DataFrame here state=state ) # This part for calculating metrics and forming clusters metrics = {} for hid in base_env_for_metrics.house_ids: total_consumption = float( np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum() ) total_solar = float( base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum() ) metrics[hid] = {'consumption': total_consumption, 'solar': total_solar} clusters = form_clusters(metrics, cluster_size) print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.") # functools.partial to create environment env_fns = [] for cluster_house_ids in clusters: preset_env_fn = functools.partial( SolarSys, data_path=data_path, time_freq=time_freq, house_ids_in_cluster=cluster_house_ids, preloaded_data=shared_df, state=state ) env_fns.append(preset_env_fn) sync_vec_env = gym.vector.SyncVectorEnv(env_fns) wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters) return wrapped_vec_env